Completed
Pull Request — master (#5550)
by Matěj
02:20
created

ssg_test_suite.common.create_tarball()   B

Complexity

Conditions 6

Size

Total Lines 18
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 32.244

Importance

Changes 0
Metric Value
cc 6
eloc 13
nop 0
dl 0
loc 18
ccs 1
cts 10
cp 0.1
crap 32.244
rs 8.6666
c 0
b 0
f 0
1 1
import os
2 1
import logging
3 1
import subprocess
4 1
from collections import namedtuple
5 1
import functools
6 1
import tarfile
7 1
import tempfile
8
9 1
from ssg.constants import MULTI_PLATFORM_MAPPING
10 1
from ssg.constants import PRODUCT_TO_CPE_MAPPING
11 1
from ssg.constants import FULL_NAME_TO_PRODUCT_MAPPING
12 1
from ssg.constants import OSCAP_RULE
13 1
from ssg_test_suite.log import LogHelper
14
15 1
Scenario_run = namedtuple(
16
    "Scenario_run",
17
    ("rule_id", "script"))
18 1
Scenario_conditions = namedtuple(
19
    "Scenario_conditions",
20
    ("backend", "scanning_mode", "remediated_by", "datastream"))
21 1
Rule = namedtuple(
22
    "Rule", ["directory", "id", "short_id", "files"])
23
24 1
_BENCHMARK_DIRS = [
25
        os.path.abspath(os.path.join(os.path.dirname(__file__), '../../linux_os/guide')),
26
        os.path.abspath(os.path.join(os.path.dirname(__file__), '../../applications')),
27
        ]
28
29 1
_SHARED_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '../shared'))
30
31 1
REMOTE_USER = "root"
32 1
REMOTE_USER_HOME_DIRECTORY = "/root"
33 1
REMOTE_TEST_SCENARIOS_DIRECTORY = os.path.join(REMOTE_USER_HOME_DIRECTORY, "ssgts")
34
35 1
try:
36 1
    SSH_ADDITIONAL_OPTS = tuple(os.environ.get('SSH_ADDITIONAL_OPTIONS').split())
37 1
except AttributeError:
38
    # If SSH_ADDITIONAL_OPTIONS is not defined set it to empty tuple.
39 1
    SSH_ADDITIONAL_OPTS = tuple()
40
41 1
SSH_ADDITIONAL_OPTS = (
42
    "-o", "StrictHostKeyChecking=no",
43
    "-o", "UserKnownHostsFile=/dev/null",
44
) + SSH_ADDITIONAL_OPTS
45
46
47 1
def walk_through_benchmark_dirs():
48
    for dirname in _BENCHMARK_DIRS:
49
        for dirpath, dirnames, filenames in os.walk(dirname):
50
            yield dirpath, dirnames, filenames
51
52
53 1
class Stage(object):
54 1
    NONE = 0
55 1
    PREPARATION = 1
56 1
    INITIAL_SCAN = 2
57 1
    REMEDIATION = 3
58 1
    FINAL_SCAN = 4
59
60
61 1
@functools.total_ordering
62 1
class RuleResult(object):
63 1
    STAGE_STRINGS = {
64
        "preparation",
65
        "initial_scan",
66
        "remediation",
67
        "final_scan",
68
    }
69
70
    """
71
    Result of a test suite testing rule under a scenario.
72
73
    Supports ordering by success - the most successful run orders first.
74
    """
75 1
    def __init__(self, result_dict=None):
76 1
        self.scenario = Scenario_run("", "")
77 1
        self.conditions = Scenario_conditions("", "", "", "")
78 1
        self.when = ""
79 1
        self.passed_stages = dict()
80 1
        self.passed_stages_count = 0
81 1
        self.success = False
82
83 1
        if result_dict:
84 1
            self.load_from_dict(result_dict)
85
86 1
    def load_from_dict(self, data):
87 1
        self.scenario = Scenario_run(data["rule_id"], data["scenario_script"])
88 1
        self.conditions = Scenario_conditions(
89
            data["backend"], data["scanning_mode"],
90
            data["remediated_by"], data["datastream"])
91 1
        self.when = data["run_timestamp"]
92
93 1
        self.passed_stages = {key: data[key] for key in self.STAGE_STRINGS if key in data}
94 1
        self.passed_stages_count = sum(self.passed_stages.values())
95
96 1
        self.success = data.get("final_scan", False)
97 1
        if not self.success:
98 1
            self.success = (
99
                "remediation" not in data
100
                and data.get("initial_scan", False))
101
102 1
    def save_to_dict(self):
103 1
        data = dict()
104 1
        data["rule_id"] = self.scenario.rule_id
105 1
        data["scenario_script"] = self.scenario.script
106
107 1
        data["backend"] = self.conditions.backend
108 1
        data["scanning_mode"] = self.conditions.scanning_mode
109 1
        data["remediated_by"] = self.conditions.remediated_by
110 1
        data["datastream"] = self.conditions.datastream
111
112 1
        data["run_timestamp"] = self.when
113
114 1
        for stage_str, result in self.passed_stages.items():
115 1
            data[stage_str] = result
116
117 1
        return data
118
119 1
    def record_stage_result(self, stage, successful):
120
        assert stage in self.STAGE_STRINGS, (
121
            "Stage name {name} is invalid, choose one from {choices}"
122
            .format(name=stage, choices=", ".join(self.STAGE_STRINGS))
123
        )
124
        self.passed_stages[stage] = successful
125
126 1
    def relative_conditions_to(self, other):
127 1
        if self.conditions == other.conditions:
128
            return self.when, other.when
129
        else:
130 1
            return tuple(self.conditions), tuple(other.conditions)
131
132 1
    def __eq__(self, other):
133 1
        return (self.success == other.success
134
                and tuple(self.passed_stages) == tuple(self.passed_stages))
135
136 1
    def __lt__(self, other):
137 1
        return self.passed_stages_count > other.passed_stages_count
138
139
140 1
def run_cmd_local(command, verbose_path, env=None):
141
    command_string = ' '.join(command)
142
    logging.debug('Running {}'.format(command_string))
143
    returncode, output = _run_cmd(command, verbose_path, env)
144
    return returncode, output
145
146
147 1
def run_cmd_remote(command_string, domain_ip, verbose_path, env=None):
148
    machine = '{0}@{1}'.format(REMOTE_USER, domain_ip)
149
    remote_cmd = ['ssh'] + list(SSH_ADDITIONAL_OPTS) + [machine, command_string]
150
    logging.debug('Running {}'.format(command_string))
151
    returncode, output = _run_cmd(remote_cmd, verbose_path, env)
152
    return returncode, output
153
154
155 1
def _run_cmd(command_list, verbose_path, env=None):
156
    returncode = 0
157
    output = b""
158
    try:
159
        with open(verbose_path, 'w') as verbose_file:
160
            output = subprocess.check_output(
161
                command_list, stderr=verbose_file, env=env)
162
    except subprocess.CalledProcessError as e:
163
        returncode = e.returncode
164
        output = e.output
165
    return returncode, output.decode('utf-8')
166
167
168 1
def _get_platform_cpes(platform):
169 1
    if platform.startswith("multi_platform_"):
170 1
        try:
171 1
            products = MULTI_PLATFORM_MAPPING[platform]
172 1
        except KeyError:
173 1
            logging.error(
174
                "Unknown multi_platform specifier: %s is not from %s"
175
                % (platform, ", ".join(MULTI_PLATFORM_MAPPING.keys())))
176 1
            raise ValueError
177 1
        platform_cpes = set()
178 1
        for p in products:
179 1
            platform_cpes |= set(PRODUCT_TO_CPE_MAPPING[p])
180 1
        return platform_cpes
181
    else:
182
        # scenario platform is specified by a full product name
183 1
        try:
184 1
            product = FULL_NAME_TO_PRODUCT_MAPPING[platform]
185 1
        except KeyError:
186 1
            logging.error(
187
                "Unknown product name: %s is not from %s"
188
                % (platform, ", ".join(FULL_NAME_TO_PRODUCT_MAPPING.keys())))
189 1
            raise ValueError
190 1
        platform_cpes = set(PRODUCT_TO_CPE_MAPPING[product])
191 1
        return platform_cpes
192
193
194 1
def matches_platform(scenario_platforms, benchmark_cpes):
195 1
    if "multi_platform_all" in scenario_platforms:
196 1
        return True
197 1
    scenario_cpes = set()
198 1
    for p in scenario_platforms:
199 1
        scenario_cpes |= _get_platform_cpes(p)
200 1
    return len(scenario_cpes & benchmark_cpes) > 0
201
202
203 1
def run_with_stdout_logging(command, args, log_file):
204
    log_file.write("{0} {1}\n".format(command, " ".join(args)))
205
    subprocess.check_call(
206
        (command,) + args, stdout=log_file, stderr=subprocess.STDOUT)
207
208
209 1
def _exclude_garbage(tarinfo):
210
    file_name = tarinfo.name
211
    if file_name.endswith('pyc'):
212
        return None
213
    if file_name.endswith('swp'):
214
        return None
215
    return tarinfo
216
217
218 1
def _make_file_root_owned(tarinfo):
219
    if tarinfo:
220
        tarinfo.uid = 0
221
        tarinfo.gid = 0
222
    return tarinfo
223
224
225 1
def create_tarball():
226
    """Create a tarball which contains all test scenarios for every rule.
227
    Tarball contains directories with the test scenarios. The name of the
228
    directories is the same as short rule ID. There is no tree structure.
229
    """
230
    with tempfile.NamedTemporaryFile(
231
            "wb", suffix=".tar.gz", delete=False) as fp:
232
        with tarfile.TarFile.open(fileobj=fp, mode="w") as tarball:
233
            tarball.add(_SHARED_DIR, arcname="shared", filter=_make_file_root_owned)
234
            for dirpath, dirnames, _ in walk_through_benchmark_dirs():
235
                rule_id = os.path.basename(dirpath)
236
                if "tests" in dirnames:
237
                    tests_dir_path = os.path.join(dirpath, "tests")
238
                    tarball.add(
239
                        tests_dir_path, arcname=rule_id,
240
                        filter=lambda tinfo: _exclude_garbage(_make_file_root_owned(tinfo))
241
                    )
242
        return fp.name
243
244
245 1
def send_scripts(domain_ip):
246
    remote_dir = REMOTE_TEST_SCENARIOS_DIRECTORY
247
    archive_file = create_tarball()
248
    archive_file_basename = os.path.basename(archive_file)
249
    remote_archive_file = os.path.join(remote_dir, archive_file_basename)
250
    machine = "{0}@{1}".format(REMOTE_USER, domain_ip)
251
    logging.debug("Uploading scripts.")
252
    log_file_name = os.path.join(LogHelper.LOG_DIR, "data.upload.log")
253
254
    with open(log_file_name, 'a') as log_file:
255
        args = SSH_ADDITIONAL_OPTS + (machine, "mkdir", "-p", remote_dir)
256
        try:
257
            run_with_stdout_logging("ssh", args, log_file)
258
        except Exception:
259
            msg = "Cannot create directory {0}.".format(remote_dir)
260
            logging.error(msg)
261
            raise RuntimeError(msg)
262
263
        args = (SSH_ADDITIONAL_OPTS
264
                + (archive_file, "{0}:{1}".format(machine, remote_dir)))
265
        try:
266
            run_with_stdout_logging("scp", args, log_file)
267
        except Exception:
268
            msg = ("Cannot copy archive {0} to the target machine's directory {1}."
269
                   .format(archive_file, remote_dir))
270
            logging.error(msg)
271
            raise RuntimeError(msg)
272
273
        args = (SSH_ADDITIONAL_OPTS
274
                + (machine, "tar xf {0} -C {1}".format(remote_archive_file, remote_dir)))
275
        try:
276
            run_with_stdout_logging("ssh", args, log_file)
277
        except Exception:
278
            msg = "Cannot extract data tarball {0}.".format(remote_archive_file)
279
            logging.error(msg)
280
            raise RuntimeError(msg)
281
    os.unlink(archive_file)
282
    return remote_dir
283
284
285 1
def iterate_over_rules():
286
    """Iterate over rule directories which have test scenarios".
287
288
    Returns:
289
        Named tuple Rule having these fields:
290
            directory -- absolute path to the rule "tests" subdirectory
291
                         containing the test scenarios in Bash
292
            id -- full rule id as it is present in datastream
293
            short_id -- short rule ID, the same as basename of the directory
294
                        containing the test scenarios in Bash
295
            files -- list of executable .sh files in the "tests" directory
296
    """
297
    for dirpath, dirnames, filenames in walk_through_benchmark_dirs():
298
        if "rule.yml" in filenames and "tests" in dirnames:
299
            short_rule_id = os.path.basename(dirpath)
300
            tests_dir = os.path.join(dirpath, "tests")
301
            tests_dir_files = os.listdir(tests_dir)
302
            # Filter out everything except the shell test scenarios.
303
            # Other files in rule directories are editor swap files
304
            # or other content than a test case.
305
            scripts = filter(lambda x: x.endswith(".sh"), tests_dir_files)
306
            full_rule_id = OSCAP_RULE + short_rule_id
307
            result = Rule(
308
                directory=tests_dir, id=full_rule_id, short_id=short_rule_id,
309
                files=scripts)
310
            yield result
311