Test Failed
Push — master ( 78c0a6...290ebe )
by Jan
02:36 queued 10s
created

tests.ssg_test_suite.common.send_scripts()   A

Complexity

Conditions 2

Size

Total Lines 24
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 5.2029

Importance

Changes 0
Metric Value
cc 2
eloc 22
nop 1
dl 0
loc 24
ccs 1
cts 14
cp 0.0714
crap 5.2029
rs 9.352
c 0
b 0
f 0
1 1
from __future__ import print_function
2
3 1
import os
4 1
import logging
5 1
import subprocess
6 1
from collections import namedtuple
7 1
import functools
8 1
import tarfile
9 1
import tempfile
10 1
import re
11
12 1
from ssg.build_cpe import ProductCPEs
13 1
from ssg.constants import MULTI_PLATFORM_MAPPING
14 1
from ssg.constants import FULL_NAME_TO_PRODUCT_MAPPING
15 1
from ssg.constants import OSCAP_RULE
16 1
from ssg_test_suite.log import LogHelper
17
18 1
Scenario_run = namedtuple(
19
    "Scenario_run",
20
    ("rule_id", "script"))
21 1
Scenario_conditions = namedtuple(
22
    "Scenario_conditions",
23
    ("backend", "scanning_mode", "remediated_by", "datastream"))
24 1
Rule = namedtuple(
25
    "Rule", ["directory", "id", "short_id", "files"])
26
27 1
_BENCHMARK_DIRS = [
28
        os.path.abspath(os.path.join(os.path.dirname(__file__), '../../linux_os/guide')),
29
        os.path.abspath(os.path.join(os.path.dirname(__file__), '../../applications')),
30
        ]
31
32 1
_SHARED_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '../shared'))
33
34 1
REMOTE_USER = "root"
35 1
REMOTE_USER_HOME_DIRECTORY = "/root"
36 1
REMOTE_TEST_SCENARIOS_DIRECTORY = os.path.join(REMOTE_USER_HOME_DIRECTORY, "ssgts")
37
38 1
try:
39 1
    SSH_ADDITIONAL_OPTS = tuple(os.environ.get('SSH_ADDITIONAL_OPTIONS').split())
40 1
except AttributeError:
41
    # If SSH_ADDITIONAL_OPTIONS is not defined set it to empty tuple.
42 1
    SSH_ADDITIONAL_OPTS = tuple()
43
44 1
SSH_ADDITIONAL_OPTS = (
45
    "-o", "StrictHostKeyChecking=no",
46
    "-o", "UserKnownHostsFile=/dev/null",
47
) + SSH_ADDITIONAL_OPTS
48
49
50 1
def walk_through_benchmark_dirs():
51
    for dirname in _BENCHMARK_DIRS:
52
        for dirpath, dirnames, filenames in os.walk(dirname):
53
            yield dirpath, dirnames, filenames
54
55
56 1
class Stage(object):
57 1
    NONE = 0
58 1
    PREPARATION = 1
59 1
    INITIAL_SCAN = 2
60 1
    REMEDIATION = 3
61 1
    FINAL_SCAN = 4
62
63
64 1
@functools.total_ordering
65 1
class RuleResult(object):
66 1
    STAGE_STRINGS = {
67
        "preparation",
68
        "initial_scan",
69
        "remediation",
70
        "final_scan",
71
    }
72
73
    """
74
    Result of a test suite testing rule under a scenario.
75
76
    Supports ordering by success - the most successful run orders first.
77
    """
78 1
    def __init__(self, result_dict=None):
79 1
        self.scenario = Scenario_run("", "")
80 1
        self.conditions = Scenario_conditions("", "", "", "")
81 1
        self.when = ""
82 1
        self.passed_stages = dict()
83 1
        self.passed_stages_count = 0
84 1
        self.success = False
85
86 1
        if result_dict:
87 1
            self.load_from_dict(result_dict)
88
89 1
    def load_from_dict(self, data):
90 1
        self.scenario = Scenario_run(data["rule_id"], data["scenario_script"])
91 1
        self.conditions = Scenario_conditions(
92
            data["backend"], data["scanning_mode"],
93
            data["remediated_by"], data["datastream"])
94 1
        self.when = data["run_timestamp"]
95
96 1
        self.passed_stages = {key: data[key] for key in self.STAGE_STRINGS if key in data}
97 1
        self.passed_stages_count = sum(self.passed_stages.values())
98
99 1
        self.success = data.get("final_scan", False)
100 1
        if not self.success:
101 1
            self.success = (
102
                "remediation" not in data
103
                and data.get("initial_scan", False))
104
105 1
    def save_to_dict(self):
106 1
        data = dict()
107 1
        data["rule_id"] = self.scenario.rule_id
108 1
        data["scenario_script"] = self.scenario.script
109
110 1
        data["backend"] = self.conditions.backend
111 1
        data["scanning_mode"] = self.conditions.scanning_mode
112 1
        data["remediated_by"] = self.conditions.remediated_by
113 1
        data["datastream"] = self.conditions.datastream
114
115 1
        data["run_timestamp"] = self.when
116
117 1
        for stage_str, result in self.passed_stages.items():
118 1
            data[stage_str] = result
119
120 1
        return data
121
122 1
    def record_stage_result(self, stage, successful):
123
        assert stage in self.STAGE_STRINGS, (
124
            "Stage name {name} is invalid, choose one from {choices}"
125
            .format(name=stage, choices=", ".join(self.STAGE_STRINGS))
126
        )
127
        self.passed_stages[stage] = successful
128
129 1
    def relative_conditions_to(self, other):
130 1
        if self.conditions == other.conditions:
131
            return self.when, other.when
132
        else:
133 1
            return tuple(self.conditions), tuple(other.conditions)
134
135 1
    def __eq__(self, other):
136 1
        return (self.success == other.success
137
                and tuple(self.passed_stages) == tuple(self.passed_stages))
138
139 1
    def __lt__(self, other):
140 1
        return self.passed_stages_count > other.passed_stages_count
141
142
143 1
def run_cmd_local(command, verbose_path, env=None):
144
    command_string = ' '.join(command)
145
    logging.debug('Running {}'.format(command_string))
146
    returncode, output = _run_cmd(command, verbose_path, env)
147
    return returncode, output
148
149
150 1
def _run_cmd(command_list, verbose_path, env=None):
151
    returncode = 0
152
    output = b""
153
    try:
154
        with open(verbose_path, 'w') as verbose_file:
155
            output = subprocess.check_output(
156
                command_list, stderr=verbose_file, env=env)
157
    except subprocess.CalledProcessError as e:
158
        returncode = e.returncode
159
        output = e.output
160
    return returncode, output.decode('utf-8')
161
162
163 1
def _get_platform_cpes(platform):
164 1
    if platform.startswith("multi_platform_"):
165 1
        try:
166 1
            products = MULTI_PLATFORM_MAPPING[platform]
167 1
        except KeyError:
168 1
            logging.error(
169
                "Unknown multi_platform specifier: %s is not from %s"
170
                % (platform, ", ".join(MULTI_PLATFORM_MAPPING.keys())))
171 1
            raise ValueError
172 1
        platform_cpes = set()
173 1
        for p in products:
174 1
            p_cpes = ProductCPEs(p)
175 1
            platform_cpes |= set(p_cpes.get_product_cpe_names())
176 1
        return platform_cpes
177
    else:
178
        # scenario platform is specified by a full product name
179 1
        try:
180 1
            product = FULL_NAME_TO_PRODUCT_MAPPING[platform]
181 1
        except KeyError:
182 1
            logging.error(
183
                "Unknown product name: %s is not from %s"
184
                % (platform, ", ".join(FULL_NAME_TO_PRODUCT_MAPPING.keys())))
185 1
            raise ValueError
186 1
        product_cpes = ProductCPEs(product)
187 1
        platform_cpes = set(product_cpes.get_product_cpe_names())
188 1
        return platform_cpes
189
190
191 1
def matches_platform(scenario_platforms, benchmark_cpes):
192 1
    if "multi_platform_all" in scenario_platforms:
193 1
        return True
194 1
    scenario_cpes = set()
195 1
    for p in scenario_platforms:
196 1
        scenario_cpes |= _get_platform_cpes(p)
197 1
    return len(scenario_cpes & benchmark_cpes) > 0
198
199
200 1
def run_with_stdout_logging(command, args, log_file):
201
    log_file.write("{0} {1}\n".format(command, " ".join(args)))
202
    result = subprocess.run(
203
            (command,) + args, encoding="utf-8", stdout=subprocess.PIPE,
204
            stderr=subprocess.PIPE, check=True)
205
    if result.stdout:
206
        log_file.write("STDOUT: ")
207
        log_file.write(result.stdout)
208
    if result.stderr:
209
        log_file.write("STDERR: ")
210
        log_file.write(result.stderr)
211
    return result.stdout
212
213
214 1
def _exclude_garbage(tarinfo):
215
    file_name = tarinfo.name
216
    if file_name.endswith('pyc'):
217
        return None
218
    if file_name.endswith('swp'):
219
        return None
220
    return tarinfo
221
222
223 1
def _make_file_root_owned(tarinfo):
224
    if tarinfo:
225
        tarinfo.uid = 0
226
        tarinfo.gid = 0
227
    return tarinfo
228
229
230 1
def create_tarball():
231
    """Create a tarball which contains all test scenarios for every rule.
232
    Tarball contains directories with the test scenarios. The name of the
233
    directories is the same as short rule ID. There is no tree structure.
234
    """
235
    with tempfile.NamedTemporaryFile(
236
            "wb", suffix=".tar.gz", delete=False) as fp:
237
        with tarfile.TarFile.open(fileobj=fp, mode="w") as tarball:
238
            tarball.add(_SHARED_DIR, arcname="shared", filter=_make_file_root_owned)
239
            for dirpath, dirnames, _ in walk_through_benchmark_dirs():
240
                rule_id = os.path.basename(dirpath)
241
                if "tests" in dirnames:
242
                    tests_dir_path = os.path.join(dirpath, "tests")
243
                    tarball.add(
244
                        tests_dir_path, arcname=rule_id,
245
                        filter=lambda tinfo: _exclude_garbage(_make_file_root_owned(tinfo))
246
                    )
247
        return fp.name
248
249
250 1
def send_scripts(test_env):
251
    remote_dir = REMOTE_TEST_SCENARIOS_DIRECTORY
252
    archive_file = create_tarball()
253
    archive_file_basename = os.path.basename(archive_file)
254
    remote_archive_file = os.path.join(remote_dir, archive_file_basename)
255
    logging.debug("Uploading scripts.")
256
    log_file_name = os.path.join(LogHelper.LOG_DIR, "env-preparation.log")
257
258
    with open(log_file_name, 'a') as log_file:
259
        print("Setting up test setup scripts", file=log_file)
260
261
        test_env.execute_ssh_command(
262
            "mkdir -p {remote_dir}".format(remote_dir=remote_dir),
263
            log_file, "Cannot create directory {0}".format(remote_dir))
264
        test_env.scp_upload_file(
265
            archive_file, remote_dir,
266
            log_file, "Cannot copy archive {0} to the target machine's directory {1}"
267
            .format(archive_file, remote_dir))
268
        test_env.execute_ssh_command(
269
            "tar xf {remote_archive_file} -C {remote_dir}"
270
            .format(remote_dir=remote_dir, remote_archive_file=remote_archive_file),
271
            log_file, "Cannot extract data tarball {0}".format(remote_archive_file))
272
    os.unlink(archive_file)
273
    return remote_dir
274
275
276 1
def iterate_over_rules():
277
    """Iterate over rule directories which have test scenarios".
278
279
    Returns:
280
        Named tuple Rule having these fields:
281
            directory -- absolute path to the rule "tests" subdirectory
282
                         containing the test scenarios in Bash
283
            id -- full rule id as it is present in datastream
284
            short_id -- short rule ID, the same as basename of the directory
285
                        containing the test scenarios in Bash
286
            files -- list of executable .sh files in the "tests" directory
287
    """
288
    for dirpath, dirnames, filenames in walk_through_benchmark_dirs():
289
        if "rule.yml" in filenames and "tests" in dirnames:
290
            short_rule_id = os.path.basename(dirpath)
291
            tests_dir = os.path.join(dirpath, "tests")
292
            tests_dir_files = os.listdir(tests_dir)
293
            # Filter out everything except the shell test scenarios.
294
            # Other files in rule directories are editor swap files
295
            # or other content than a test case.
296
            scripts = filter(lambda x: x.endswith(".sh"), tests_dir_files)
297
            full_rule_id = OSCAP_RULE + short_rule_id
298
            result = Rule(
299
                directory=tests_dir, id=full_rule_id, short_id=short_rule_id,
300
                files=scripts)
301
            yield result
302
303
304 1
def get_cpe_of_tested_os(test_env, log_file):
305
    os_release_file = "/etc/os-release"
306
    cpe_line = test_env.execute_ssh_command(
307
        "grep CPE_NAME {os_release_file}".format(os_release_file=os_release_file),
308
        log_file)
309
    # We are parsing an assignment that is possibly quoted
310
    cpe = re.match(r'''CPE_NAME=(["']?)(.*)\1''', cpe_line)
311
    if cpe and cpe.groups()[1]:
312
        return cpe.groups()[1]
313
    msg = ["Unable to get a CPE of the system running tests"]
314
    if cpe_line:
315
        msg.append(
316
            "Retreived a CPE line that we couldn't parse: {cpe_line}"
317
            .format(cpe_line=cpe_line))
318
    else:
319
        msg.append(
320
            "Couldn't get CPE entry from '{os_release_file}'"
321
            .format(os_release_file=os_release_file))
322
    raise RuntimeError("\n".join(msg))
323
324
325 1
INSTALL_COMMANDS = dict(
326
    fedora=("dnf", "install", "-y"),
327
    rhel7=("yum", "install", "-y"),
328
    rhel8=("yum", "install", "-y"),
329
    ubuntu=("DEBIAN_FRONTEND=noninteractive", "apt", "install", "-y"),
330
)
331
332
333 1
def install_packages(test_env, packages):
334
    log_file_name = os.path.join(LogHelper.LOG_DIR, "env-preparation.log")
335
336
    with open(log_file_name, "a") as log_file:
337
        platform_cpe = get_cpe_of_tested_os(test_env, log_file)
338
    platform = cpes_to_platform([platform_cpe])
339
340
    command_str = " ".join(INSTALL_COMMANDS[platform] + tuple(packages))
341
342
    with open(log_file_name, 'a') as log_file:
343
        print("Installing packages", file=log_file)
344
        log_file.flush()
345
        test_env.execute_ssh_command(
346
            command_str, log_file,
347
            "Couldn't install required packages {packages}".format(packages=packages))
348
349
350 1
def cpes_to_platform(cpes):
351
    for cpe in cpes:
352
        if "fedora" in cpe:
353
            return "fedora"
354
        if "redhat:enterprise_linux" in cpe:
355
            match = re.search(r":enterprise_linux:([^:]+):", cpe)
356
            if match:
357
                major_version = match.groups()[0].split(".")[0]
358
                return "rhel" + major_version
359
        if "ubuntu" in cpe:
360
            return "ubuntu"
361
    msg = "Unable to deduce a platform from these CPEs: {cpes}".format(cpes=cpes)
362
    raise ValueError(msg)
363