Passed
Push — master ( ea7c4a...d61422 )
by Matěj
01:18 queued 12s
created

test_suite   A

Complexity

Total Complexity 31

Size/Duplication

Total Lines 370
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 256
dl 0
loc 370
rs 9.92
c 0
b 0
f 0
wmc 31

9 Functions

Rating   Name   Duplication   Size   Complexity  
A get_logging_dir() 0 18 3
A _print_available_benchmarks() 0 4 2
A auto_select_xccdf_id() 0 23 5
A get_datastreams() 0 6 1
B parse_args() 0 171 1
A get_unique_datastream() 0 11 3
C normalize_passed_arguments() 0 49 9
A datastream_in_stash() 0 7 1
B main() 0 33 6
1
#!/usr/bin/env python2
2
from __future__ import print_function
3
4
import argparse
5
import textwrap
6
import logging
7
import os
8
import os.path
9
import time
10
import sys
11
from glob import glob
12
import re
13
import contextlib
14
import tempfile
15
16
ssg_dir = os.path.join(os.path.dirname(__file__), "..")
17
sys.path.append(ssg_dir)
18
19
from ssg_test_suite.log import LogHelper
20
import ssg_test_suite.oscap
21
import ssg_test_suite.test_env
22
import ssg_test_suite.profile
23
import ssg_test_suite.rule
24
import ssg_test_suite.combined
25
from ssg_test_suite import xml_operations
26
27
28
def parse_args():
29
    parser = argparse.ArgumentParser()
30
31
    common_parser = argparse.ArgumentParser(add_help=False)
32
    common_parser.set_defaults(test_env=None)
33
34
    backends = common_parser.add_mutually_exclusive_group(required=True)
35
36
    backends.add_argument(
37
        "--docker", dest="docker", metavar="BASE_IMAGE",
38
        help="Use Docker test environment with this base image.")
39
    backends.add_argument(
40
        "--container", dest="container", metavar="BASE_IMAGE",
41
        help="Use container test environment with this base image.")
42
43
    backends.add_argument(
44
        "--libvirt", dest="libvirt", metavar=("HYPERVISOR", "DOMAIN"), nargs=2,
45
        help="libvirt hypervisor and domain name. When the leading URI driver protocol "
46
        "is omitted from the hypervisor, qemu:/// protocol is assumed. "
47
        "Example of a hypervisor domain name tuple: system ssg-test-suite")
48
49
    common_parser.add_argument("--datastream",
50
                               dest="datastream",
51
                               metavar="DATASTREAM",
52
                               help=("Path to the Source DataStream on this "
53
                                     "machine which is going to be tested"))
54
    benchmarks = common_parser.add_mutually_exclusive_group()
55
    benchmarks.add_argument("--xccdf-id",
56
                               dest="xccdf_id",
57
                               metavar="REF-ID",
58
                               default=None,
59
                               help="Reference ID related to benchmark to "
60
                                    "be used.")
61
    benchmarks.add_argument("--xccdf-id-number",
62
                               dest="xccdf_id_number",
63
                               metavar="REF-ID-SELECT",
64
                               type=int,
65
                               default=0,
66
                               help="Selection number of reference ID related "
67
                                    "to benchmark to be used.")
68
    common_parser.add_argument(
69
            "--add-platform",
70
            metavar="<CPE REGEX>",
71
            default=None,
72
            help="Find all CPEs that are present in local OpenSCAP's CPE dictionary "
73
            "that match the provided regex, "
74
            "and add them as platforms to all datastream benchmarks. "
75
            "If the regex doesn't match anything, it will be treated "
76
            "as a literal CPE, and added as a platform. "
77
            "For example, use 'cpe:/o:fedoraproject:fedora:30' or 'enterprise_linux'.")
78
    common_parser.add_argument(
79
            "--remove-machine-only",
80
            default=False,
81
            action="store_true",
82
            help="Removes machine-only platform constraint from rules "
83
            "to enable testing these rules on container backends.")
84
    common_parser.add_argument("--loglevel",
85
                               dest="loglevel",
86
                               metavar="LOGLEVEL",
87
                               default="INFO",
88
                               help="Default level of console output")
89
    common_parser.add_argument("--logdir",
90
                               dest="logdir",
91
                               metavar="LOGDIR",
92
                               default=None,
93
                               help="Directory to which all output is saved")
94
95
    common_parser.add_argument(
96
        "--mode",
97
        dest="scanning_mode",
98
        default="online",
99
        choices=("online", "offline"),
100
        help="What type of check to use - either "
101
        "Online check done by running oscap inside the concerned system, or "
102
        "offline check that examines the filesystem from the host "
103
        "(either may require extended privileges).")
104
105
    common_parser.add_argument(
106
        "--remediate-using",
107
        dest="remediate_using",
108
        default="oscap",
109
        choices=ssg_test_suite.oscap.REMEDIATION_RULE_RUNNERS.keys(),
110
        help="What type of remediations to use - openscap online one, "
111
        "or remediation done by using remediation roles "
112
        "that are saved to disk beforehand.")
113
114
    subparsers = parser.add_subparsers(dest="subparser_name",
115
                                       help="Subcommands: profile, rule, combined")
116
    subparsers.required = True
117
118
    parser_profile = subparsers.add_parser("profile",
119
                                           formatter_class=argparse.RawDescriptionHelpFormatter,
120
                                           epilog=textwrap.dedent("""\
121
                    In case that tested profile contains rules which might prevent root ssh access
122
                    to the testing VM consider unselecting these rules. To unselect certain rules
123
                    from a datastream use `ds_unselect_rules.sh` script. List of such rules already
124
                    exists, see `unselect_rules_list` file.
125
                    Example usage:
126
                        ./ds_unselect_rules.sh ../build/ssg-fedora-ds.xml unselect_rules_list
127
                                           """),
128
                                           help=("Testing profile-based "
129
                                                 "remediation applied on already "
130
                                                 "installed machine"),
131
                                           parents=[common_parser])
132
    parser_profile.set_defaults(func=ssg_test_suite.profile.perform_profile_check)
133
    parser_profile.add_argument("target",
134
                                nargs="+",
135
                                metavar="DSPROFILE",
136
                                help=("Profiles to be tested, 'ALL' means every "
137
                                      "profile of particular benchmark will be "
138
                                      "evaluated."))
139
140
    parser_rule = subparsers.add_parser("rule",
141
                                        help=("Testing remediations of particular "
142
                                              "rule for various situations - "
143
                                              "currently not supported "
144
                                              "by openscap!"),
145
                                        parents=[common_parser])
146
    parser_rule.set_defaults(func=ssg_test_suite.rule.perform_rule_check)
147
    parser_rule.add_argument(
148
        "target",
149
        nargs="+",
150
        metavar="RULE",
151
        help=(
152
            "Rule or rules to be tested. Special value 'ALL' means every "
153
            "rule-testing scenario will be evaluated. The SSG rule ID prefix "
154
            "is appended automatically if not provided. Wildcards to match "
155
            "multiple rules are accepted."
156
            )
157
        )
158
    parser_rule.add_argument("--debug",
159
                             dest="manual_debug",
160
                             action="store_true",
161
                             help=("If an error is encountered, all execution "
162
                                   "on the VM / container will pause to allow "
163
                                   "debugging."))
164
    parser_rule.add_argument("--dontclean",
165
                             dest="dont_clean",
166
                             action="store_true",
167
                             help="Do not remove html reports of successful runs")
168
    parser_rule.add_argument("--scenarios",
169
                             dest="scenarios_regex",
170
                             default=None,
171
                             help="Regular expression matching test scenarios to run")
172
    parser_rule.add_argument("--profile",
173
                             dest="scenarios_profile",
174
                             default=None,
175
                             help="Override the profile used for test scenarios."
176
                                  " Variable selections will be done according "
177
                                  "to this profile.")
178
179
    parser_combined = subparsers.add_parser("combined",
180
                                            help=("Tests all rules in a profile evaluating them "
181
                                                  "against their test scenarios."),
182
                                            parents=[common_parser])
183
    parser_combined.set_defaults(func=ssg_test_suite.combined.perform_combined_check)
184
    parser_combined.add_argument("--dontclean",
185
                                 dest="dont_clean",
186
                                 action="store_true",
187
                                 help="Do not remove html reports of successful runs")
188
    parser_combined.add_argument("--scenarios",
189
                                 dest="scenarios_regex",
190
                                 default=None,
191
                                 help="Regular expression matching test scenarios to run")
192
    parser_combined.add_argument("target",
193
                                 metavar="TARGET",
194
                                 help=("Profile whose rules are to be tested. Each rule selected "
195
                                       "in the profile will be evaluated against all its test "
196
                                       "scenarios."))
197
198
    return parser.parse_args()
199
200
201
def get_logging_dir(options):
202
    body = 'custom'
203
    if 'ALL' in options.target:
204
        body = 'ALL'
205
206
    generic_logdir_stem = "{0}-{1}".format(options.subparser_name, body)
207
208
    if options.logdir is None:
209
210
        date_string = time.strftime('%Y-%m-%d-%H%M', time.localtime())
211
        logging_dir = os.path.join(
212
            os.getcwd(), 'logs', '{0}-{1}'.format(
213
                generic_logdir_stem, date_string))
214
        logging_dir = LogHelper.find_name(logging_dir)
215
    else:
216
        logging_dir = LogHelper.find_name(options.logdir)
217
218
    return logging_dir
219
220
221
def _print_available_benchmarks(xccdf_ids, n_xccdf_ids):
222
    logging.info("The DataStream contains {0} Benchmarks".format(n_xccdf_ids))
223
    for i in range(0, n_xccdf_ids):
224
        logging.info("{0} - {1}".format(i, xccdf_ids[i]))
225
226
227
def auto_select_xccdf_id(datastream, bench_number):
228
    xccdf_ids = xml_operations.get_all_xccdf_ids_in_datastream(datastream)
229
    n_xccdf_ids = len(xccdf_ids)
230
231
    if n_xccdf_ids == 0:
232
        msg = ("The provided DataStream doesn't contain any Benchmark")
233
        raise RuntimeError(msg)
234
235
    if bench_number < 0 or bench_number >= n_xccdf_ids:
236
        _print_available_benchmarks(xccdf_ids, n_xccdf_ids)
237
        logging.info("Selected Benchmark is {0}".format(bench_number))
238
239
        msg = ("Please select a valid Benchmark number")
240
        raise RuntimeError(msg)
241
242
    if n_xccdf_ids > 1:
243
        _print_available_benchmarks(xccdf_ids, n_xccdf_ids)
244
        logging.info("Selected Benchmark is {0}".format(bench_number))
245
246
        logging.info("To select a different Benchmark, "
247
                     "use --xccdf-id-number option.")
248
249
    return xccdf_ids[bench_number]
250
251
252
def get_datastreams():
253
    ds_glob = "ssg-*-ds.xml"
254
    build_dir_path = [os.path.dirname(__file__) or ".", "..", "build"]
255
    glob_pattern = os.path.sep.join(build_dir_path + [ds_glob])
256
    datastreams = [os.path.normpath(p) for p in glob(glob_pattern)]
257
    return datastreams
258
259
260
def get_unique_datastream():
261
    datastreams = get_datastreams()
262
    if len(datastreams) == 1:
263
        return datastreams[0]
264
    msg = ("Autodetection of the datastream file is possible only when there is "
265
           "a single one in the build dir, but")
266
    if not datastreams:
267
        raise RuntimeError(msg + " there is none.")
268
    raise RuntimeError(
269
        msg + " there are {0} of them. Use the --datastream option to select "
270
        "e.g. {1}".format(len(datastreams), datastreams))
271
272
273
@contextlib.contextmanager
274
def datastream_in_stash(current_location):
275
    tfile = tempfile.NamedTemporaryFile(prefix="ssgts-ds-")
276
277
    tfile.write(open(current_location, "rb").read())
278
    tfile.flush()
279
    yield tfile.name
280
281
282
def normalize_passed_arguments(options):
283
    if 'ALL' in options.target:
284
        options.target = ['ALL']
285
286
    if not options.datastream:
287
        options.datastream = get_unique_datastream()
288
289
    if options.xccdf_id is None:
290
        options.xccdf_id = auto_select_xccdf_id(options.datastream,
291
                                                options.xccdf_id_number)
292
    try:
293
        bench_id = xml_operations.infer_benchmark_id_from_component_ref_id(
294
            options.datastream, options.xccdf_id)
295
        options.benchmark_id = bench_id
296
    except RuntimeError as exc:
297
        msg = "Error inferring benchmark ID from component refId: {}".format(str(exc))
298
        raise RuntimeError(msg)
299
300
    if options.docker:
301
        options.test_env = ssg_test_suite.test_env.DockerTestEnv(
302
            options.scanning_mode, options.docker)
303
        logging.info(
304
            "The base image option has been specified, "
305
            "choosing Docker-based test environment.")
306
    elif options.container:
307
        options.test_env = ssg_test_suite.test_env.PodmanTestEnv(
308
            options.scanning_mode, options.container)
309
        logging.info(
310
            "The base image option has been specified, "
311
            "choosing Podman-based test environment.")
312
    else:
313
        hypervisor, domain_name = options.libvirt
314
        # Possible hypervisor spec we have to catch: qemu+unix:///session
315
        if not re.match(r"[\w\+]+:///", hypervisor):
316
            hypervisor = "qemu:///" + hypervisor
317
        options.test_env = ssg_test_suite.test_env.VMTestEnv(
318
            options.scanning_mode, hypervisor, domain_name)
319
        logging.info(
320
            "The base image option has not been specified, "
321
            "choosing libvirt-based test environment.")
322
323
    try:
324
        benchmark_cpes = xml_operations.benchmark_get_applicable_platforms(
325
            options.datastream, options.benchmark_id
326
        )
327
        options.benchmark_cpes = benchmark_cpes
328
    except RuntimeError as exc:
329
        msg = "Error inferring platform from benchmark: {}".format(str(exc))
330
        raise RuntimeError(msg)
331
332
333
def main():
334
    options = parse_args()
335
336
    log = logging.getLogger()
337
    # this is general logger level - needs to be
338
    # debug otherwise it cuts silently everything
339
    log.setLevel(logging.DEBUG)
340
341
    LogHelper.add_console_logger(log, options.loglevel)
342
343
    try:
344
        normalize_passed_arguments(options)
345
    except RuntimeError as exc:
346
        msg = "Error occurred during options normalization: {}".format(str(exc))
347
        logging.error(msg)
348
        sys.exit(1)
349
    # logging dir needs to be created based on other options
350
    # thus we have to postprocess it
351
352
    logging_dir = get_logging_dir(options)
353
354
    LogHelper.add_logging_dir(log, logging_dir)
355
356
    with datastream_in_stash(options.datastream) as stashed_datastream:
357
        options.datastream = stashed_datastream
358
359
        with xml_operations.datastream_root(stashed_datastream, stashed_datastream) as root:
360
            if options.remove_machine_only:
361
                xml_operations.remove_machine_platform(root)
362
            if options.add_platform:
363
                xml_operations.add_platform_to_benchmark(root, options.add_platform)
364
365
        options.func(options)
366
367
368
if __name__ == "__main__":
369
    main()
370