Passed
Push — master ( 9a42e5...061c2c )
by Jan
02:45 queued 13s
created

ssg_test_suite.common._exclude_garbage()   A

Complexity

Conditions 3

Size

Total Lines 7
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 8.6667

Importance

Changes 0
Metric Value
cc 3
eloc 7
nop 1
dl 0
loc 7
ccs 1
cts 7
cp 0.1429
crap 8.6667
rs 10
c 0
b 0
f 0
1 1
from __future__ import print_function
2
3 1
import os
4 1
import logging
5 1
import subprocess
6 1
from collections import namedtuple
7 1
import functools
8 1
import tarfile
9 1
import tempfile
10 1
import re
11
12 1
from ssg.constants import MULTI_PLATFORM_MAPPING
13 1
from ssg.constants import PRODUCT_TO_CPE_MAPPING
14 1
from ssg.constants import FULL_NAME_TO_PRODUCT_MAPPING
15 1
from ssg.constants import OSCAP_RULE
16 1
from ssg_test_suite.log import LogHelper
17
18 1
Scenario_run = namedtuple(
19
    "Scenario_run",
20
    ("rule_id", "script"))
21 1
Scenario_conditions = namedtuple(
22
    "Scenario_conditions",
23
    ("backend", "scanning_mode", "remediated_by", "datastream"))
24 1
Rule = namedtuple(
25
    "Rule", ["directory", "id", "short_id", "files"])
26
27 1
_BENCHMARK_DIRS = [
28
        os.path.abspath(os.path.join(os.path.dirname(__file__), '../../linux_os/guide')),
29
        os.path.abspath(os.path.join(os.path.dirname(__file__), '../../applications')),
30
        ]
31
32 1
_SHARED_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '../shared'))
33
34 1
REMOTE_USER = "root"
35 1
REMOTE_USER_HOME_DIRECTORY = "/root"
36 1
REMOTE_TEST_SCENARIOS_DIRECTORY = os.path.join(REMOTE_USER_HOME_DIRECTORY, "ssgts")
37
38 1
try:
39 1
    SSH_ADDITIONAL_OPTS = tuple(os.environ.get('SSH_ADDITIONAL_OPTIONS').split())
40 1
except AttributeError:
41
    # If SSH_ADDITIONAL_OPTIONS is not defined set it to empty tuple.
42 1
    SSH_ADDITIONAL_OPTS = tuple()
43
44 1
SSH_ADDITIONAL_OPTS = (
45
    "-o", "StrictHostKeyChecking=no",
46
    "-o", "UserKnownHostsFile=/dev/null",
47
) + SSH_ADDITIONAL_OPTS
48
49
50 1
def walk_through_benchmark_dirs():
51
    for dirname in _BENCHMARK_DIRS:
52
        for dirpath, dirnames, filenames in os.walk(dirname):
53
            yield dirpath, dirnames, filenames
54
55
56 1
class Stage(object):
57 1
    NONE = 0
58 1
    PREPARATION = 1
59 1
    INITIAL_SCAN = 2
60 1
    REMEDIATION = 3
61 1
    FINAL_SCAN = 4
62
63
64 1
@functools.total_ordering
65 1
class RuleResult(object):
66 1
    STAGE_STRINGS = {
67
        "preparation",
68
        "initial_scan",
69
        "remediation",
70
        "final_scan",
71
    }
72
73
    """
74
    Result of a test suite testing rule under a scenario.
75
76
    Supports ordering by success - the most successful run orders first.
77
    """
78 1
    def __init__(self, result_dict=None):
79 1
        self.scenario = Scenario_run("", "")
80 1
        self.conditions = Scenario_conditions("", "", "", "")
81 1
        self.when = ""
82 1
        self.passed_stages = dict()
83 1
        self.passed_stages_count = 0
84 1
        self.success = False
85
86 1
        if result_dict:
87 1
            self.load_from_dict(result_dict)
88
89 1
    def load_from_dict(self, data):
90 1
        self.scenario = Scenario_run(data["rule_id"], data["scenario_script"])
91 1
        self.conditions = Scenario_conditions(
92
            data["backend"], data["scanning_mode"],
93
            data["remediated_by"], data["datastream"])
94 1
        self.when = data["run_timestamp"]
95
96 1
        self.passed_stages = {key: data[key] for key in self.STAGE_STRINGS if key in data}
97 1
        self.passed_stages_count = sum(self.passed_stages.values())
98
99 1
        self.success = data.get("final_scan", False)
100 1
        if not self.success:
101 1
            self.success = (
102
                "remediation" not in data
103
                and data.get("initial_scan", False))
104
105 1
    def save_to_dict(self):
106 1
        data = dict()
107 1
        data["rule_id"] = self.scenario.rule_id
108 1
        data["scenario_script"] = self.scenario.script
109
110 1
        data["backend"] = self.conditions.backend
111 1
        data["scanning_mode"] = self.conditions.scanning_mode
112 1
        data["remediated_by"] = self.conditions.remediated_by
113 1
        data["datastream"] = self.conditions.datastream
114
115 1
        data["run_timestamp"] = self.when
116
117 1
        for stage_str, result in self.passed_stages.items():
118 1
            data[stage_str] = result
119
120 1
        return data
121
122 1
    def record_stage_result(self, stage, successful):
123
        assert stage in self.STAGE_STRINGS, (
124
            "Stage name {name} is invalid, choose one from {choices}"
125
            .format(name=stage, choices=", ".join(self.STAGE_STRINGS))
126
        )
127
        self.passed_stages[stage] = successful
128
129 1
    def relative_conditions_to(self, other):
130 1
        if self.conditions == other.conditions:
131
            return self.when, other.when
132
        else:
133 1
            return tuple(self.conditions), tuple(other.conditions)
134
135 1
    def __eq__(self, other):
136 1
        return (self.success == other.success
137
                and tuple(self.passed_stages) == tuple(self.passed_stages))
138
139 1
    def __lt__(self, other):
140 1
        return self.passed_stages_count > other.passed_stages_count
141
142
143 1
def run_cmd_local(command, verbose_path, env=None):
144
    command_string = ' '.join(command)
145
    logging.debug('Running {}'.format(command_string))
146
    returncode, output = _run_cmd(command, verbose_path, env)
147
    return returncode, output
148
149
150 1
def _run_cmd(command_list, verbose_path, env=None):
151
    returncode = 0
152
    output = b""
153
    try:
154
        with open(verbose_path, 'w') as verbose_file:
155
            output = subprocess.check_output(
156
                command_list, stderr=verbose_file, env=env)
157
    except subprocess.CalledProcessError as e:
158
        returncode = e.returncode
159
        output = e.output
160
    return returncode, output.decode('utf-8')
161
162
163 1
def _get_platform_cpes(platform):
164 1
    if platform.startswith("multi_platform_"):
165 1
        try:
166 1
            products = MULTI_PLATFORM_MAPPING[platform]
167 1
        except KeyError:
168 1
            logging.error(
169
                "Unknown multi_platform specifier: %s is not from %s"
170
                % (platform, ", ".join(MULTI_PLATFORM_MAPPING.keys())))
171 1
            raise ValueError
172 1
        platform_cpes = set()
173 1
        for p in products:
174 1
            platform_cpes |= set(PRODUCT_TO_CPE_MAPPING[p])
175 1
        return platform_cpes
176
    else:
177
        # scenario platform is specified by a full product name
178 1
        try:
179 1
            product = FULL_NAME_TO_PRODUCT_MAPPING[platform]
180 1
        except KeyError:
181 1
            logging.error(
182
                "Unknown product name: %s is not from %s"
183
                % (platform, ", ".join(FULL_NAME_TO_PRODUCT_MAPPING.keys())))
184 1
            raise ValueError
185 1
        platform_cpes = set(PRODUCT_TO_CPE_MAPPING[product])
186 1
        return platform_cpes
187
188
189 1
def matches_platform(scenario_platforms, benchmark_cpes):
190 1
    if "multi_platform_all" in scenario_platforms:
191 1
        return True
192 1
    scenario_cpes = set()
193 1
    for p in scenario_platforms:
194 1
        scenario_cpes |= _get_platform_cpes(p)
195 1
    return len(scenario_cpes & benchmark_cpes) > 0
196
197
198 1
def run_with_stdout_logging(command, args, log_file):
199
    log_file.write("{0} {1}\n".format(command, " ".join(args)))
200
    result = subprocess.run(
201
            (command,) + args, encoding="utf-8", capture_output=True, check=True)
202
    if result.stdout:
203
        log_file.write("STDOUT: ")
204
        log_file.write(result.stdout)
205
    if result.stderr:
206
        log_file.write("STDERR: ")
207
        log_file.write(result.stderr)
208
    return result.stdout
209
210
211 1
def _exclude_garbage(tarinfo):
212
    file_name = tarinfo.name
213
    if file_name.endswith('pyc'):
214
        return None
215
    if file_name.endswith('swp'):
216
        return None
217
    return tarinfo
218
219
220 1
def _make_file_root_owned(tarinfo):
221
    if tarinfo:
222
        tarinfo.uid = 0
223
        tarinfo.gid = 0
224
    return tarinfo
225
226
227 1
def create_tarball():
228
    """Create a tarball which contains all test scenarios for every rule.
229
    Tarball contains directories with the test scenarios. The name of the
230
    directories is the same as short rule ID. There is no tree structure.
231
    """
232
    with tempfile.NamedTemporaryFile(
233
            "wb", suffix=".tar.gz", delete=False) as fp:
234
        with tarfile.TarFile.open(fileobj=fp, mode="w") as tarball:
235
            tarball.add(_SHARED_DIR, arcname="shared", filter=_make_file_root_owned)
236
            for dirpath, dirnames, _ in walk_through_benchmark_dirs():
237
                rule_id = os.path.basename(dirpath)
238
                if "tests" in dirnames:
239
                    tests_dir_path = os.path.join(dirpath, "tests")
240
                    tarball.add(
241
                        tests_dir_path, arcname=rule_id,
242
                        filter=lambda tinfo: _exclude_garbage(_make_file_root_owned(tinfo))
243
                    )
244
        return fp.name
245
246
247 1
def send_scripts(test_env):
248
    remote_dir = REMOTE_TEST_SCENARIOS_DIRECTORY
249
    archive_file = create_tarball()
250
    archive_file_basename = os.path.basename(archive_file)
251
    remote_archive_file = os.path.join(remote_dir, archive_file_basename)
252
    logging.debug("Uploading scripts.")
253
    log_file_name = os.path.join(LogHelper.LOG_DIR, "env-preparation.log")
254
255
    with open(log_file_name, 'a') as log_file:
256
        print("Setting up test setup scripts", file=log_file)
257
258
        test_env.execute_ssh_command(
259
            "mkdir -p {remote_dir}".format(remote_dir=remote_dir),
260
            log_file, "Cannot create directory {0}".format(remote_dir))
261
        test_env.scp_upload_file(
262
            archive_file, remote_dir,
263
            log_file, "Cannot copy archive {0} to the target machine's directory {1}"
264
            .format(archive_file, remote_dir))
265
        test_env.execute_ssh_command(
266
            "tar xf {remote_archive_file} -C {remote_dir}"
267
            .format(remote_dir=remote_dir, remote_archive_file=remote_archive_file),
268
            log_file, "Cannot extract data tarball {0}".format(remote_archive_file))
269
    os.unlink(archive_file)
270
    return remote_dir
271
272
273 1
def iterate_over_rules():
274
    """Iterate over rule directories which have test scenarios".
275
276
    Returns:
277
        Named tuple Rule having these fields:
278
            directory -- absolute path to the rule "tests" subdirectory
279
                         containing the test scenarios in Bash
280
            id -- full rule id as it is present in datastream
281
            short_id -- short rule ID, the same as basename of the directory
282
                        containing the test scenarios in Bash
283
            files -- list of executable .sh files in the "tests" directory
284
    """
285
    for dirpath, dirnames, filenames in walk_through_benchmark_dirs():
286
        if "rule.yml" in filenames and "tests" in dirnames:
287
            short_rule_id = os.path.basename(dirpath)
288
            tests_dir = os.path.join(dirpath, "tests")
289
            tests_dir_files = os.listdir(tests_dir)
290
            # Filter out everything except the shell test scenarios.
291
            # Other files in rule directories are editor swap files
292
            # or other content than a test case.
293
            scripts = filter(lambda x: x.endswith(".sh"), tests_dir_files)
294
            full_rule_id = OSCAP_RULE + short_rule_id
295
            result = Rule(
296
                directory=tests_dir, id=full_rule_id, short_id=short_rule_id,
297
                files=scripts)
298
            yield result
299
300
301 1
def get_cpe_of_tested_os(test_env, log_file):
302
    os_release_file = "/etc/os-release"
303
    cpe_line = test_env.execute_ssh_command(
304
        "grep CPE_NAME {os_release_file}".format(os_release_file=os_release_file),
305
        log_file)
306
    # We are parsing an assignment that is possibly quoted
307
    cpe = re.match(r'''CPE_NAME=(["']?)(.*)\1''', cpe_line)
308
    if cpe and cpe.groups()[1]:
309
        return cpe.groups()[1]
310
    msg = ["Unable to get a CPE of the system running tests"]
311
    if cpe_line:
312
        msg.append(
313
            "Retreived a CPE line that we couldn't parse: {cpe_line}"
314
            .format(cpe_line=cpe_line))
315
    else:
316
        msg.append(
317
            "Couldn't get CPE entry from '{os_release_file}'"
318
            .format(os_release_file=os_release_file))
319
    raise RuntimeError("\n".join(msg))
320
321
322 1
INSTALL_COMMANDS = dict(
323
    fedora=("dnf", "install", "-y"),
324
    rhel7=("yum", "install", "-y"),
325
    rhel8=("yum", "install", "-y"),
326
)
327
328
329 1
def install_packages(test_env, packages):
330
    log_file_name = os.path.join(LogHelper.LOG_DIR, "env-preparation.log")
331
332
    with open(log_file_name, "a") as log_file:
333
        platform_cpe = get_cpe_of_tested_os(test_env, log_file)
334
    platform = cpes_to_platform([platform_cpe])
335
336
    command_str = " ".join(INSTALL_COMMANDS[platform] + tuple(packages))
337
338
    with open(log_file_name, 'a') as log_file:
339
        print("Installing packages", file=log_file)
340
        log_file.flush()
341
        test_env.execute_ssh_command(
342
            command_str, log_file,
343
            "Couldn't install required packages {packages}".format(packages=packages))
344
345
346 1
def cpes_to_platform(cpes):
347
    for cpe in cpes:
348
        if "fedora" in cpe:
349
            return "fedora"
350
        if "redhat:enterprise_linux" in cpe:
351
            match = re.search(r":enterprise_linux:([^:]+):", cpe)
352
            if match:
353
                major_version = match.groups()[0].split(".")[0]
354
                return "rhel" + major_version
355
    msg = "Unable to deduce a platform from these CPEs: {cpes}".format(cpes=cpes)
356
    raise ValueError(msg)
357