Test Failed
Push — master ( 79e936...04af7d )
by Jan
02:32 queued 12s
created

tests.ssg_test_suite.common.run_cmd_local()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.512

Importance

Changes 0
Metric Value
cc 1
eloc 5
nop 3
dl 0
loc 5
ccs 1
cts 5
cp 0.2
crap 1.512
rs 10
c 0
b 0
f 0
1 1
from __future__ import print_function
2
3 1
import os
4 1
import logging
5 1
import subprocess
6 1
from collections import namedtuple
7 1
import functools
8 1
import tarfile
9 1
import tempfile
10 1
import re
11 1
import shutil
12
13 1
from ssg.build_cpe import ProductCPEs
14 1
from ssg.build_yaml import Rule as RuleYAML
15 1
from ssg.constants import MULTI_PLATFORM_MAPPING
16 1
from ssg.constants import FULL_NAME_TO_PRODUCT_MAPPING
17 1
from ssg.constants import OSCAP_RULE
18 1
from ssg.jinja import process_file
19 1
from ssg.products import product_yaml_path, load_product_yaml
20 1
from ssg.rules import get_rule_dir_yaml, is_rule_dir
21 1
from ssg.rule_yaml import parse_prodtype
22 1
from ssg_test_suite.log import LogHelper
23
24 1
Scenario_run = namedtuple(
25
    "Scenario_run",
26
    ("rule_id", "script"))
27 1
Scenario_conditions = namedtuple(
28
    "Scenario_conditions",
29
    ("backend", "scanning_mode", "remediated_by", "datastream"))
30 1
Rule = namedtuple(
31
    "Rule", ["directory", "id", "short_id", "files"])
32
33 1
SSG_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
34
35 1
_BENCHMARK_DIRS = [
36
        os.path.abspath(os.path.join(SSG_ROOT, 'linux_os', 'guide')),
37
        os.path.abspath(os.path.join(SSG_ROOT, 'applications')),
38
        ]
39
40 1
_SHARED_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '../shared'))
41
42 1
REMOTE_USER = "root"
43 1
REMOTE_USER_HOME_DIRECTORY = "/root"
44 1
REMOTE_TEST_SCENARIOS_DIRECTORY = os.path.join(REMOTE_USER_HOME_DIRECTORY, "ssgts")
45
46 1
try:
47 1
    SSH_ADDITIONAL_OPTS = tuple(os.environ.get('SSH_ADDITIONAL_OPTIONS').split())
48 1
except AttributeError:
49
    # If SSH_ADDITIONAL_OPTIONS is not defined set it to empty tuple.
50 1
    SSH_ADDITIONAL_OPTS = tuple()
51
52 1
SSH_ADDITIONAL_OPTS = (
53
    "-o", "StrictHostKeyChecking=no",
54
    "-o", "UserKnownHostsFile=/dev/null",
55
) + SSH_ADDITIONAL_OPTS
56
57
58 1
def walk_through_benchmark_dirs(product=None):
59
    directories = _BENCHMARK_DIRS
60
    if product is not None:
61
        yaml_path = product_yaml_path(SSG_ROOT, product)
62
        product_base = os.path.dirname(yaml_path)
63
        product_yaml = load_product_yaml(yaml_path)
64
        benchmark_root = os.path.join(product_base, product_yaml['benchmark_root'])
65
        directories = [os.path.abspath(benchmark_root)]
66
67
    for dirname in directories:
68
        for dirpath, dirnames, filenames in os.walk(dirname):
69
            yield dirpath, dirnames, filenames
70
71
72 1
class Stage(object):
73 1
    NONE = 0
74 1
    PREPARATION = 1
75 1
    INITIAL_SCAN = 2
76 1
    REMEDIATION = 3
77 1
    FINAL_SCAN = 4
78
79
80 1
@functools.total_ordering
81 1
class RuleResult(object):
82 1
    STAGE_STRINGS = {
83
        "preparation",
84
        "initial_scan",
85
        "remediation",
86
        "final_scan",
87
    }
88
89
    """
90
    Result of a test suite testing rule under a scenario.
91
92
    Supports ordering by success - the most successful run orders first.
93
    """
94 1
    def __init__(self, result_dict=None):
95 1
        self.scenario = Scenario_run("", "")
96 1
        self.conditions = Scenario_conditions("", "", "", "")
97 1
        self.when = ""
98 1
        self.passed_stages = dict()
99 1
        self.passed_stages_count = 0
100 1
        self.success = False
101
102 1
        if result_dict:
103 1
            self.load_from_dict(result_dict)
104
105 1
    def load_from_dict(self, data):
106 1
        self.scenario = Scenario_run(data["rule_id"], data["scenario_script"])
107 1
        self.conditions = Scenario_conditions(
108
            data["backend"], data["scanning_mode"],
109
            data["remediated_by"], data["datastream"])
110 1
        self.when = data["run_timestamp"]
111
112 1
        self.passed_stages = {key: data[key] for key in self.STAGE_STRINGS if key in data}
113 1
        self.passed_stages_count = sum(self.passed_stages.values())
114
115 1
        self.success = data.get("final_scan", False)
116 1
        if not self.success:
117 1
            self.success = (
118
                "remediation" not in data
119
                and data.get("initial_scan", False))
120
121 1
    def save_to_dict(self):
122 1
        data = dict()
123 1
        data["rule_id"] = self.scenario.rule_id
124 1
        data["scenario_script"] = self.scenario.script
125
126 1
        data["backend"] = self.conditions.backend
127 1
        data["scanning_mode"] = self.conditions.scanning_mode
128 1
        data["remediated_by"] = self.conditions.remediated_by
129 1
        data["datastream"] = self.conditions.datastream
130
131 1
        data["run_timestamp"] = self.when
132
133 1
        for stage_str, result in self.passed_stages.items():
134 1
            data[stage_str] = result
135
136 1
        return data
137
138 1
    def record_stage_result(self, stage, successful):
139
        assert stage in self.STAGE_STRINGS, (
140
            "Stage name {name} is invalid, choose one from {choices}"
141
            .format(name=stage, choices=", ".join(self.STAGE_STRINGS))
142
        )
143
        self.passed_stages[stage] = successful
144
145 1
    def relative_conditions_to(self, other):
146 1
        if self.conditions == other.conditions:
147
            return self.when, other.when
148
        else:
149 1
            return tuple(self.conditions), tuple(other.conditions)
150
151 1
    def __eq__(self, other):
152 1
        return (self.success == other.success
153
                and tuple(self.passed_stages) == tuple(self.passed_stages))
154
155 1
    def __lt__(self, other):
156 1
        return self.passed_stages_count > other.passed_stages_count
157
158
159 1
def run_cmd_local(command, verbose_path, env=None):
160
    command_string = ' '.join(command)
161
    logging.debug('Running {}'.format(command_string))
162
    returncode, output = _run_cmd(command, verbose_path, env)
163
    return returncode, output
164
165
166 1
def _run_cmd(command_list, verbose_path, env=None):
167
    returncode = 0
168
    output = b""
169
    try:
170
        with open(verbose_path, 'w') as verbose_file:
171
            output = subprocess.check_output(
172
                command_list, stderr=verbose_file, env=env)
173
    except subprocess.CalledProcessError as e:
174
        returncode = e.returncode
175
        output = e.output
176
    return returncode, output.decode('utf-8')
177
178
179 1
def _get_platform_cpes(platform):
180 1
    ssg_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
181 1
    if platform.startswith("multi_platform_"):
182 1
        try:
183 1
            products = MULTI_PLATFORM_MAPPING[platform]
184 1
        except KeyError:
185 1
            logging.error(
186
                "Unknown multi_platform specifier: %s is not from %s"
187
                % (platform, ", ".join(MULTI_PLATFORM_MAPPING.keys())))
188 1
            raise ValueError
189 1
        platform_cpes = set()
190 1
        for p in products:
191 1
            product_yaml_path = os.path.join(ssg_root, "products", p, "product.yml")
192 1
            product_yaml = load_product_yaml(product_yaml_path)
193 1
            p_cpes = ProductCPEs(product_yaml)
194 1
            platform_cpes |= set(p_cpes.get_product_cpe_names())
195 1
        return platform_cpes
196
    else:
197
        # scenario platform is specified by a full product name
198 1
        try:
199 1
            product = FULL_NAME_TO_PRODUCT_MAPPING[platform]
200 1
        except KeyError:
201 1
            logging.error(
202
                "Unknown product name: %s is not from %s"
203
                % (platform, ", ".join(FULL_NAME_TO_PRODUCT_MAPPING.keys())))
204 1
            raise ValueError
205 1
        product_yaml_path = os.path.join(ssg_root, "products", product, "product.yml")
206 1
        product_yaml = load_product_yaml(product_yaml_path)
207 1
        product_cpes = ProductCPEs(product_yaml)
208 1
        platform_cpes = set(product_cpes.get_product_cpe_names())
209 1
        return platform_cpes
210
211
212 1
def matches_platform(scenario_platforms, benchmark_cpes):
213 1
    if "multi_platform_all" in scenario_platforms:
214 1
        return True
215 1
    scenario_cpes = set()
216 1
    for p in scenario_platforms:
217 1
        scenario_cpes |= _get_platform_cpes(p)
218 1
    return len(scenario_cpes & benchmark_cpes) > 0
219
220
221 1
def run_with_stdout_logging(command, args, log_file):
222
    log_file.write("{0} {1}\n".format(command, " ".join(args)))
223
    result = subprocess.run(
224
            (command,) + args, encoding="utf-8", stdout=subprocess.PIPE,
225
            stderr=subprocess.PIPE, check=True)
226
    if result.stdout:
227
        log_file.write("STDOUT: ")
228
        log_file.write(result.stdout)
229
    if result.stderr:
230
        log_file.write("STDERR: ")
231
        log_file.write(result.stderr)
232
    return result.stdout
233
234
235 1
def _exclude_garbage(tarinfo):
236
    file_name = tarinfo.name
237
    if file_name.endswith('pyc'):
238
        return None
239
    if file_name.endswith('swp'):
240
        return None
241
    return tarinfo
242
243
244 1
def _make_file_root_owned(tarinfo):
245
    if tarinfo:
246
        tarinfo.uid = 0
247
        tarinfo.gid = 0
248
    return tarinfo
249
250
251 1
def _rel_abs_path(current_path, base_path):
252
    """
253
    Return the value of the current path, relative to the base path, but
254
    resolving paths absolutely first. This helps when walking a nested
255
    directory structure and want to get the subtree relative to the original
256
    path
257
    """
258
    tmp_path = os.path.abspath(current_path)
259
    return os.path.relpath(current_path, base_path)
260
261
262 1
def template_tests(product=None):
263
    """
264
    Create a temporary directory with test cases parsed via jinja using
265
    product-specific context.
266
    """
267
    # Set up an empty temp directory
268
    tmpdir = tempfile.mkdtemp()
269
270
    # We want to remove the temporary directory on failure, but preserve
271
    # it on success. Wrap in a try/except block and reraise the original
272
    # exception after removing the temporary directory.
273
    try:
274
        # Load product's YAML file if present. This will allow us to parse
275
        # tests in the context of the product we're executing under.
276
        product_yaml = dict()
277
        if product:
278
            yaml_path = product_yaml_path(SSG_ROOT, product)
279
            product_yaml = load_product_yaml(yaml_path)
280
281
        # Below we could run into a DocumentationNotComplete error. However,
282
        # because the test suite isn't executed in the context of a particular
283
        # build (though, ideally it would be linked), we may not know exactly
284
        # whether the top-level rule/profile we're testing is actually
285
        # completed. Thus, forcibly set the required property to bypass this
286
        # error.
287
        product_yaml['cmake_build_type'] = 'Debug'
288
289
        # Note that we're not exactly copying 1-for-1 the contents of the
290
        # directory structure into the temporary one. Instead we want a
291
        # flattened mapping with all rules in a single top-level directory
292
        # and all tests immediately contained within it. That is:
293
        #
294
        # /group_a/rule_a/tests/something.pass.sh -> /rule_a/something.pass.sh
295
        for dirpath, dirnames, _ in walk_through_benchmark_dirs(product):
296
            # Skip anything that isn't obviously a rule.
297
            if "tests" not in dirnames or not is_rule_dir(dirpath):
298
                continue
299
300
            # Load rule content in our environment. We use this to satisfy
301
            # some implied properties that might be used in the test suite.
302
            rule_path = get_rule_dir_yaml(dirpath)
303
            rule = RuleYAML.from_yaml(rule_path, product_yaml)
304
305
            # Note that most places would check prodtype, but we don't care
306
            # about that here: if the rule is available to the product, we
307
            # load and parse it anyways as we have no knowledge of the
308
            # top-level profile or rule passed into the test suite.
309
            prodtypes = parse_prodtype(rule.prodtype)
310
311
            # Our local copy of env_yaml needs some properties from rule.yml
312
            # for completeness.
313
            local_env_yaml = dict()
314
            local_env_yaml.update(product_yaml)
315
            local_env_yaml['rule_id'] = rule.id_
316
            local_env_yaml['rule_title'] = rule.title
317
            local_env_yaml['products'] = prodtypes
318
319
            # Create the destination directory.
320
            dest_path = os.path.join(tmpdir, rule.id_)
321
            os.mkdir(dest_path)
322
323
            # Walk the test directory, writing all tests into the output
324
            # directory, recursively.
325
            tests_dir_path = os.path.join(dirpath, "tests")
326
            tests_dir_path = os.path.abspath(tests_dir_path)
327
            for dirpath, dirnames, filenames in os.walk(tests_dir_path):
328
                for dirname in dirnames:
329
                    # We want to recreate the correct path under the temporary
330
                    # directory. Resolve it to a relative path from the tests/
331
                    # directory.
332
                    dir_path = _rel_abs_path(os.path.join(dirpath, dirname), tests_dir_path)
333
                    assert '../' not in dir_path
334
                    tmp_dir_path = os.path.join(dest_path, dir_path)
335
                    os.mkdir(tmp_dir_path)
336
337
                for filename in filenames:
338
                    # We want to recreate the correct path under the temporary
339
                    # directory. Resolve it to a relative path from the tests/
340
                    # directory. Assumption: directories should be created
341
                    # prior to recursing into them, so we don't need to handle
342
                    # if a file's parent directory doesn't yet exist under the
343
                    # destination.
344
                    src_test_path = os.path.join(dirpath, filename)
345
                    rel_test_path = _rel_abs_path(src_test_path, tests_dir_path)
346
                    dest_test_path = os.path.join(dest_path, rel_test_path)
347
348
                    # Rather than performing an OS-level copy, we need to
349
                    # first parse the test with jinja and then write it back
350
                    # out to the destination.
351
                    parsed_test = process_file(src_test_path, local_env_yaml)
352
                    with open(dest_test_path, 'w') as output_fp:
353
                        print(parsed_test, file=output_fp)
354
355
    except Exception as exp:
356
        shutil.rmtree(tmpdir, ignore_errors=True)
357
        raise exp
358
359
    return tmpdir
360
361
362 1
def create_tarball(product):
363
    """Create a tarball which contains all test scenarios for every rule.
364
    Tarball contains directories with the test scenarios. The name of the
365
    directories is the same as short rule ID. There is no tree structure.
366
    """
367
    templated_tests = template_tests(product=product)
368
369
    try:
370
        with tempfile.NamedTemporaryFile(
371
                "wb", suffix=".tar.gz", delete=False) as fp:
372
            with tarfile.TarFile.open(fileobj=fp, mode="w") as tarball:
373
                tarball.add(_SHARED_DIR, arcname="shared", filter=_make_file_root_owned)
374
                for rule_id in os.listdir(templated_tests):
375
                    # When a top-level directory exists under the temporary
376
                    # templated tests directory, we've already validated that
377
                    # it is a valid rule directory. Thus we can simply add it
378
                    # to the tarball.
379
                    absolute_dir = os.path.join(templated_tests, rule_id)
380
                    if not os.path.isdir(absolute_dir):
381
                        continue
382
383
                    tarball.add(
384
                        absolute_dir, arcname=rule_id,
385
                        filter=lambda tinfo: _exclude_garbage(_make_file_root_owned(tinfo))
386
                    )
387
388
            # Since we've added the templated contents into the tarball, we
389
            # can now delete the tree.
390
            shutil.rmtree(templated_tests, ignore_errors=True)
391
            return fp.name
392
    except Exception as exp:
393
        shutil.rmtree(templated_tests, ignore_errors=True)
394
        raise exp
395
396
397 1
def send_scripts(test_env):
398
    remote_dir = REMOTE_TEST_SCENARIOS_DIRECTORY
399
    archive_file = create_tarball(test_env.product)
400
    archive_file_basename = os.path.basename(archive_file)
401
    remote_archive_file = os.path.join(remote_dir, archive_file_basename)
402
    logging.debug("Uploading scripts.")
403
    log_file_name = os.path.join(LogHelper.LOG_DIR, "env-preparation.log")
404
405
    with open(log_file_name, 'a') as log_file:
406
        print("Setting up test setup scripts", file=log_file)
407
408
        test_env.execute_ssh_command(
409
            "mkdir -p {remote_dir}".format(remote_dir=remote_dir),
410
            log_file, "Cannot create directory {0}".format(remote_dir))
411
        test_env.scp_upload_file(
412
            archive_file, remote_dir,
413
            log_file, "Cannot copy archive {0} to the target machine's directory {1}"
414
            .format(archive_file, remote_dir))
415
        test_env.execute_ssh_command(
416
            "tar xf {remote_archive_file} -C {remote_dir}"
417
            .format(remote_dir=remote_dir, remote_archive_file=remote_archive_file),
418
            log_file, "Cannot extract data tarball {0}".format(remote_archive_file))
419
    os.unlink(archive_file)
420
    return remote_dir
421
422
423 1
def iterate_over_rules(product=None):
424
    """Iterate over rule directories which have test scenarios".
425
426
    Returns:
427
        Named tuple Rule having these fields:
428
            directory -- absolute path to the rule "tests" subdirectory
429
                         containing the test scenarios in Bash
430
            id -- full rule id as it is present in datastream
431
            short_id -- short rule ID, the same as basename of the directory
432
                        containing the test scenarios in Bash
433
            files -- list of executable .sh files in the "tests" directory
434
    """
435
    for dirpath, dirnames, filenames in walk_through_benchmark_dirs(product):
436
        if "rule.yml" in filenames and "tests" in dirnames:
437
            short_rule_id = os.path.basename(dirpath)
438
            tests_dir = os.path.join(dirpath, "tests")
439
            tests_dir_files = os.listdir(tests_dir)
440
            # Filter out everything except the shell test scenarios.
441
            # Other files in rule directories are editor swap files
442
            # or other content than a test case.
443
            scripts = filter(lambda x: x.endswith(".sh"), tests_dir_files)
444
            full_rule_id = OSCAP_RULE + short_rule_id
445
            result = Rule(
446
                directory=tests_dir, id=full_rule_id, short_id=short_rule_id,
447
                files=scripts)
448
            yield result
449
450
451 1
def get_cpe_of_tested_os(test_env, log_file):
452
    os_release_file = "/etc/os-release"
453
    cpe_line = test_env.execute_ssh_command(
454
        "grep CPE_NAME {os_release_file}".format(os_release_file=os_release_file),
455
        log_file)
456
    # We are parsing an assignment that is possibly quoted
457
    cpe = re.match(r'''CPE_NAME=(["']?)(.*)\1''', cpe_line)
458
    if cpe and cpe.groups()[1]:
459
        return cpe.groups()[1]
460
    msg = ["Unable to get a CPE of the system running tests"]
461
    if cpe_line:
462
        msg.append(
463
            "Retreived a CPE line that we couldn't parse: {cpe_line}"
464
            .format(cpe_line=cpe_line))
465
    else:
466
        msg.append(
467
            "Couldn't get CPE entry from '{os_release_file}'"
468
            .format(os_release_file=os_release_file))
469
    raise RuntimeError("\n".join(msg))
470
471
472 1
INSTALL_COMMANDS = dict(
473
    fedora=("dnf", "install", "-y"),
474
    rhel7=("yum", "install", "-y"),
475
    rhel8=("yum", "install", "-y"),
476
    ubuntu=("DEBIAN_FRONTEND=noninteractive", "apt", "install", "-y"),
477
)
478
479
480 1
def install_packages(test_env, packages):
481
    log_file_name = os.path.join(LogHelper.LOG_DIR, "env-preparation.log")
482
483
    with open(log_file_name, "a") as log_file:
484
        platform_cpe = get_cpe_of_tested_os(test_env, log_file)
485
    platform = cpes_to_platform([platform_cpe])
486
487
    command_str = " ".join(INSTALL_COMMANDS[platform] + tuple(packages))
488
489
    with open(log_file_name, 'a') as log_file:
490
        print("Installing packages", file=log_file)
491
        log_file.flush()
492
        test_env.execute_ssh_command(
493
            command_str, log_file,
494
            "Couldn't install required packages {packages}".format(packages=packages))
495
496
497 1
def cpes_to_platform(cpes):
498
    for cpe in cpes:
499
        if "fedora" in cpe:
500
            return "fedora"
501
        if "redhat:enterprise_linux" in cpe:
502
            match = re.search(r":enterprise_linux:([^:]+):", cpe)
503
            if match:
504
                major_version = match.groups()[0].split(".")[0]
505
                return "rhel" + major_version
506
        if "ubuntu" in cpe:
507
            return "ubuntu"
508
    msg = "Unable to deduce a platform from these CPEs: {cpes}".format(cpes=cpes)
509
    raise ValueError(msg)
510