Test Failed
Push — master ( 79e936...04af7d )
by Jan
02:32 queued 12s
created

tests.test_suite.get_datastreams()   A

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 6
nop 0
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
#!/usr/bin/env python
2
from __future__ import print_function
3
4
import argparse
5
import textwrap
6
import logging
7
import os
8
import os.path
9
import time
10
import sys
11
from glob import glob
12
import re
13
import contextlib
14
import tempfile
15
16
ssg_dir = os.path.join(os.path.dirname(__file__), "..")
17
sys.path.append(ssg_dir)
18
19
from ssg_test_suite.log import LogHelper
20
import ssg_test_suite.oscap
21
import ssg_test_suite.test_env
22
import ssg_test_suite.profile
23
import ssg_test_suite.rule
24
import ssg_test_suite.combined
25
from ssg_test_suite import xml_operations
26
27
28
def parse_args():
29
    parser = argparse.ArgumentParser()
30
31
    common_parser = argparse.ArgumentParser(add_help=False)
32
    common_parser.set_defaults(test_env=None)
33
34
    backends = common_parser.add_mutually_exclusive_group(required=True)
35
36
    backends.add_argument(
37
        "--docker", dest="docker", metavar="BASE_IMAGE",
38
        help="Use Docker test environment with this base image.")
39
    backends.add_argument(
40
        "--container", dest="container", metavar="BASE_IMAGE",
41
        help="Use container test environment with this base image.")
42
43
    backends.add_argument(
44
        "--libvirt", dest="libvirt", metavar=("HYPERVISOR", "DOMAIN"), nargs=2,
45
        help="libvirt hypervisor and domain name. When the leading URI driver protocol "
46
        "is omitted from the hypervisor, qemu:/// protocol is assumed. "
47
        "Example of a hypervisor domain name tuple: system ssg-test-suite")
48
49
    common_parser.add_argument(
50
        "--datastream", dest="datastream", metavar="DATASTREAM",
51
        help="Path to the Source DataStream on this machine which is going to be tested. "
52
        "If not supplied, autodetection is attempted by looking into the build directory.")
53
54
    common_parser.add_argument(
55
        "--product", dest="product", metavar="PRODUCT", default=None,
56
        help="Product to interpret tests as being run under; autodetected from datastream "
57
        "if it follows the ssg-<product>-ds*.xml naming convention.")
58
59
    benchmarks = common_parser.add_mutually_exclusive_group()
60
    benchmarks.add_argument("--xccdf-id",
61
                               dest="xccdf_id",
62
                               metavar="REF-ID",
63
                               default=None,
64
                               help="Reference ID related to benchmark to "
65
                                    "be used.")
66
    benchmarks.add_argument("--xccdf-id-number",
67
                               dest="xccdf_id_number",
68
                               metavar="REF-ID-SELECT",
69
                               type=int,
70
                               default=0,
71
                               help="Selection number of reference ID related "
72
                                    "to benchmark to be used.")
73
    common_parser.add_argument(
74
            "--add-platform",
75
            metavar="<CPE REGEX>",
76
            default=None,
77
            help="Find all CPEs that are present in local OpenSCAP's CPE dictionary "
78
            "that match the provided regex, "
79
            "and add them as platforms to all datastream benchmarks. "
80
            "If the regex doesn't match anything, it will be treated "
81
            "as a literal CPE, and added as a platform. "
82
            "For example, use 'cpe:/o:fedoraproject:fedora:30' or 'enterprise_linux'.")
83
    common_parser.add_argument(
84
            "--remove-machine-only",
85
            default=False,
86
            action="store_true",
87
            help="Removes machine-only platform constraint from rules "
88
            "to enable testing these rules on container backends.")
89
    common_parser.add_argument("--loglevel",
90
                               dest="loglevel",
91
                               metavar="LOGLEVEL",
92
                               default="INFO",
93
                               help="Default level of console output")
94
    common_parser.add_argument("--logdir",
95
                               dest="logdir",
96
                               metavar="LOGDIR",
97
                               default=None,
98
                               help="Directory to which all output is saved")
99
100
    common_parser.add_argument(
101
        "--mode",
102
        dest="scanning_mode",
103
        default="online",
104
        choices=("online", "offline"),
105
        help="What type of check to use - either "
106
        "Online check done by running oscap inside the concerned system, or "
107
        "offline check that examines the filesystem from the host "
108
        "(either may require extended privileges).")
109
110
    common_parser.add_argument(
111
        "--remediate-using",
112
        dest="remediate_using",
113
        default="oscap",
114
        choices=ssg_test_suite.oscap.REMEDIATION_RULE_RUNNERS.keys(),
115
        help="What type of remediations to use - openscap online one, "
116
        "or remediation done by using remediation roles "
117
        "that are saved to disk beforehand.")
118
119
    subparsers = parser.add_subparsers(dest="subparser_name",
120
                                       help="Subcommands: profile, rule, combined")
121
    subparsers.required = True
122
123
    parser_profile = subparsers.add_parser("profile",
124
                                           formatter_class=argparse.RawDescriptionHelpFormatter,
125
                                           epilog=textwrap.dedent("""\
126
                    In case that tested profile contains rules which might prevent root ssh access
127
                    to the testing VM consider unselecting these rules. To unselect certain rules
128
                    from a datastream use `ds_unselect_rules.sh` script. List of such rules already
129
                    exists, see `unselect_rules_list` file.
130
                    Example usage:
131
                        ./ds_unselect_rules.sh ../build/ssg-fedora-ds.xml unselect_rules_list
132
                                           """),
133
                                           help=("Testing profile-based "
134
                                                 "remediation applied on already "
135
                                                 "installed machine"),
136
                                           parents=[common_parser])
137
    parser_profile.set_defaults(func=ssg_test_suite.profile.perform_profile_check)
138
    parser_profile.add_argument("target",
139
                                nargs="+",
140
                                metavar="DSPROFILE",
141
                                help=("Profiles to be tested, 'ALL' means every "
142
                                      "profile of particular benchmark will be "
143
                                      "evaluated."))
144
145
    parser_rule = subparsers.add_parser("rule",
146
                                        help=("Testing remediations of particular "
147
                                              "rule for various situations - "
148
                                              "currently not supported "
149
                                              "by openscap!"),
150
                                        parents=[common_parser])
151
    parser_rule.set_defaults(func=ssg_test_suite.rule.perform_rule_check)
152
    parser_rule.add_argument(
153
        "target",
154
        nargs="+",
155
        metavar="RULE",
156
        help=(
157
            "Rule or rules to be tested. Special value 'ALL' means every "
158
            "rule-testing scenario will be evaluated. The SSG rule ID prefix "
159
            "is appended automatically if not provided. Wildcards to match "
160
            "multiple rules are accepted."
161
            )
162
        )
163
    parser_rule.add_argument("--debug",
164
                             dest="manual_debug",
165
                             action="store_true",
166
                             help=("If an error is encountered, all execution "
167
                                   "on the VM / container will pause to allow "
168
                                   "debugging."))
169
    parser_rule.add_argument("--dontclean",
170
                             dest="dont_clean",
171
                             action="store_true",
172
                             help="Do not remove html reports of successful runs")
173
    parser_rule.add_argument("--scenarios",
174
                             dest="scenarios_regex",
175
                             default=None,
176
                             help="Regular expression matching test scenarios to run")
177
    parser_rule.add_argument("--profile",
178
                             dest="scenarios_profile",
179
                             default=None,
180
                             help="Override the profile used for test scenarios."
181
                                  " Variable selections will be done according "
182
                                  "to this profile.")
183
184
    parser_combined = subparsers.add_parser("combined",
185
                                            help=("Tests all rules in a profile evaluating them "
186
                                                  "against their test scenarios."),
187
                                            parents=[common_parser])
188
    parser_combined.set_defaults(func=ssg_test_suite.combined.perform_combined_check)
189
    parser_combined.add_argument("--dontclean",
190
                                 dest="dont_clean",
191
                                 action="store_true",
192
                                 help="Do not remove html reports of successful runs")
193
    parser_combined.add_argument("--scenarios",
194
                                 dest="scenarios_regex",
195
                                 default=None,
196
                                 help="Regular expression matching test scenarios to run")
197
    parser_combined.add_argument("target",
198
                                 metavar="TARGET",
199
                                 help=("Profile whose rules are to be tested. Each rule selected "
200
                                       "in the profile will be evaluated against all its test "
201
                                       "scenarios."))
202
203
    return parser.parse_args()
204
205
206
def get_logging_dir(options):
207
    body = 'custom'
208
    if 'ALL' in options.target:
209
        body = 'ALL'
210
211
    generic_logdir_stem = "{0}-{1}".format(options.subparser_name, body)
212
213
    if options.logdir is None:
214
215
        date_string = time.strftime('%Y-%m-%d-%H%M', time.localtime())
216
        logging_dir = os.path.join(
217
            os.getcwd(), 'logs', '{0}-{1}'.format(
218
                generic_logdir_stem, date_string))
219
        logging_dir = LogHelper.find_name(logging_dir)
220
    else:
221
        logging_dir = LogHelper.find_name(options.logdir)
222
223
    return logging_dir
224
225
226
def _print_available_benchmarks(xccdf_ids, n_xccdf_ids):
227
    logging.info("The DataStream contains {0} Benchmarks".format(n_xccdf_ids))
228
    for i in range(0, n_xccdf_ids):
229
        logging.info("{0} - {1}".format(i, xccdf_ids[i]))
230
231
232
def auto_select_xccdf_id(datastream, bench_number):
233
    xccdf_ids = xml_operations.get_all_xccdf_ids_in_datastream(datastream)
234
    n_xccdf_ids = len(xccdf_ids)
235
236
    if n_xccdf_ids == 0:
237
        msg = ("The provided DataStream doesn't contain any Benchmark")
238
        raise RuntimeError(msg)
239
240
    if bench_number < 0 or bench_number >= n_xccdf_ids:
241
        _print_available_benchmarks(xccdf_ids, n_xccdf_ids)
242
        logging.info("Selected Benchmark is {0}".format(bench_number))
243
244
        msg = ("Please select a valid Benchmark number")
245
        raise RuntimeError(msg)
246
247
    if n_xccdf_ids > 1:
248
        _print_available_benchmarks(xccdf_ids, n_xccdf_ids)
249
        logging.info("Selected Benchmark is {0}".format(bench_number))
250
251
        logging.info("To select a different Benchmark, "
252
                     "use --xccdf-id-number option.")
253
254
    return xccdf_ids[bench_number]
255
256
257
def get_datastreams():
258
    ds_glob = "ssg-*-ds.xml"
259
    build_dir_path = [os.path.dirname(__file__) or ".", "..", "build"]
260
    glob_pattern = os.path.sep.join(build_dir_path + [ds_glob])
261
    datastreams = [os.path.normpath(p) for p in glob(glob_pattern)]
262
    return datastreams
263
264
265
def get_unique_datastream():
266
    datastreams = get_datastreams()
267
    if len(datastreams) == 1:
268
        return datastreams[0]
269
    msg = ("Autodetection of the datastream file is possible only when there is "
270
           "a single one in the build dir, but")
271
    if not datastreams:
272
        raise RuntimeError(msg + " there is none.")
273
    raise RuntimeError(
274
        msg + " there are {0} of them. Use the --datastream option to select "
275
        "e.g. {1}".format(len(datastreams), datastreams))
276
277
278
@contextlib.contextmanager
279
def datastream_in_stash(current_location):
280
    tfile = tempfile.NamedTemporaryFile(prefix="ssgts-ds-")
281
282
    tfile.write(open(current_location, "rb").read())
283
    tfile.flush()
284
    yield tfile.name
285
286
287
def normalize_passed_arguments(options):
288
    if 'ALL' in options.target:
289
        options.target = ['ALL']
290
291
    if not options.datastream:
292
        options.datastream = get_unique_datastream()
293
294
    if not options.product and options.datastream:
295
        product_regex = re.compile(r'^.*ssg-([a-zA-Z0-9]*)-(ds|ds-1\.2)\.xml$')
296
        match = product_regex.match(options.datastream)
297
        if not match:
298
            msg = "Unable to detect product without explicit --product: "
299
            msg += "datastream {0} lacks product name".format(datastream)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable datastream does not seem to be defined.
Loading history...
300
            raise RuntimeError(msg)
301
        options.product = match.group(1)
302
303
    if options.xccdf_id is None:
304
        options.xccdf_id = auto_select_xccdf_id(options.datastream,
305
                                                options.xccdf_id_number)
306
    try:
307
        bench_id = xml_operations.infer_benchmark_id_from_component_ref_id(
308
            options.datastream, options.xccdf_id)
309
        options.benchmark_id = bench_id
310
    except RuntimeError as exc:
311
        msg = "Error inferring benchmark ID from component refId: {}".format(str(exc))
312
        raise RuntimeError(msg)
313
314
    if options.docker:
315
        options.test_env = ssg_test_suite.test_env.DockerTestEnv(
316
            options.scanning_mode, options.docker)
317
        logging.info(
318
            "The base image option has been specified, "
319
            "choosing Docker-based test environment.")
320
    elif options.container:
321
        options.test_env = ssg_test_suite.test_env.PodmanTestEnv(
322
            options.scanning_mode, options.container)
323
        logging.info(
324
            "The base image option has been specified, "
325
            "choosing Podman-based test environment.")
326
    else:
327
        hypervisor, domain_name = options.libvirt
328
        # Possible hypervisor spec we have to catch: qemu+unix:///session
329
        if not re.match(r"[\w\+]+:///", hypervisor):
330
            hypervisor = "qemu:///" + hypervisor
331
        options.test_env = ssg_test_suite.test_env.VMTestEnv(
332
            options.scanning_mode, hypervisor, domain_name)
333
        logging.info(
334
            "The base image option has not been specified, "
335
            "choosing libvirt-based test environment.")
336
337
    # Add in product to the test environment. This is independent of actual
338
    # test environment type so we do it after creation.
339
    options.test_env.product = options.product
340
341
    try:
342
        benchmark_cpes = xml_operations.benchmark_get_applicable_platforms(
343
            options.datastream, options.benchmark_id
344
        )
345
        options.benchmark_cpes = benchmark_cpes
346
    except RuntimeError as exc:
347
        msg = "Error inferring platform from benchmark: {}".format(str(exc))
348
        raise RuntimeError(msg)
349
350
351
def main():
352
    options = parse_args()
353
354
    log = logging.getLogger()
355
    # this is general logger level - needs to be
356
    # debug otherwise it cuts silently everything
357
    log.setLevel(logging.DEBUG)
358
359
    LogHelper.add_console_logger(log, options.loglevel)
360
361
    try:
362
        normalize_passed_arguments(options)
363
    except RuntimeError as exc:
364
        msg = "Error occurred during options normalization: {}".format(str(exc))
365
        logging.error(msg)
366
        sys.exit(1)
367
    # logging dir needs to be created based on other options
368
    # thus we have to postprocess it
369
370
    logging_dir = get_logging_dir(options)
371
372
    LogHelper.add_logging_dir(log, logging_dir)
373
374
    with datastream_in_stash(options.datastream) as stashed_datastream:
375
        options.datastream = stashed_datastream
376
377
        with xml_operations.datastream_root(stashed_datastream, stashed_datastream) as root:
378
            if options.remove_machine_only:
379
                xml_operations.remove_machine_platform(root)
380
                xml_operations.remove_machine_remediation_condition(root)
381
            if options.add_platform:
382
                xml_operations.add_platform_to_benchmark(root, options.add_platform)
383
384
        options.func(options)
385
386
387
if __name__ == "__main__":
388
    main()
389