Passed
Pull Request — master (#4657)
by Matěj
02:24
created

test_suite.get_unique_datastream()   A

Complexity

Conditions 3

Size

Total Lines 11
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 10
nop 0
dl 0
loc 11
rs 9.9
c 0
b 0
f 0
1
#!/usr/bin/env python2
2
from __future__ import print_function
3
4
import argparse
5
import logging
6
import os
7
import os.path
8
import time
9
import sys
10
from glob import glob
11
12
ssg_dir = os.path.join(os.path.dirname(__file__), "..")
13
sys.path.append(ssg_dir)
14
15
from ssg_test_suite.log import LogHelper
16
import ssg_test_suite.oscap
17
import ssg_test_suite.test_env
18
import ssg_test_suite.profile
19
import ssg_test_suite.rule
20
import ssg_test_suite.combined
21
from ssg_test_suite import xml_operations
22
23
24
def parse_args():
25
    parser = argparse.ArgumentParser()
26
27
    common_parser = argparse.ArgumentParser(add_help=False)
28
    common_parser.set_defaults(test_env=None)
29
30
    backends = common_parser.add_mutually_exclusive_group(required=True)
31
32
    backends.add_argument(
33
        "--docker", dest="docker", metavar="BASE_IMAGE",
34
        help="Use Docker test environment with this base image.")
35
    backends.add_argument(
36
        "--container", dest="container", metavar="BASE_IMAGE",
37
        help="Use container test environment with this base image.")
38
39
    backends.add_argument(
40
        "--libvirt", dest="libvirt", metavar="HYPERVISOR DOMAIN", nargs=2,
41
        help="libvirt hypervisor and domain name. "
42
        "Example of a hypervisor domain name tuple: qemu:///system ssg-test-suite")
43
44
    common_parser.add_argument("--datastream",
45
                               dest="datastream",
46
                               metavar="DATASTREAM",
47
                               help=("Path to the Source DataStream on this "
48
                                     "machine which is going to be tested"))
49
    benchmarks = common_parser.add_mutually_exclusive_group()
50
    benchmarks.add_argument("--xccdf-id",
51
                               dest="xccdf_id",
52
                               metavar="REF-ID",
53
                               default=None,
54
                               help="Reference ID related to benchmark to "
55
                                    "be used.")
56
    benchmarks.add_argument("--xccdf-id-number",
57
                               dest="xccdf_id_number",
58
                               metavar="REF-ID-SELECT",
59
                               type=int,
60
                               default=0,
61
                               help="Selection number of reference ID related "
62
                                    "to benchmark to be used.")
63
    common_parser.add_argument("--loglevel",
64
                               dest="loglevel",
65
                               metavar="LOGLEVEL",
66
                               default="INFO",
67
                               help="Default level of console output")
68
    common_parser.add_argument("--logdir",
69
                               dest="logdir",
70
                               metavar="LOGDIR",
71
                               default=None,
72
                               help="Directory to which all output is saved")
73
74
    common_parser.add_argument(
75
        "--mode",
76
        dest="scanning_mode",
77
        default="online",
78
        choices=("online", "offline"),
79
        help="What type of check to use - either "
80
        "Online check done by running oscap inside the concerned system, or "
81
        "offline check that examines the filesystem from the host "
82
        "(either may require extended privileges).")
83
84
    common_parser.add_argument(
85
        "--remediate-using",
86
        dest="remediate_using",
87
        default="oscap",
88
        choices=ssg_test_suite.oscap.REMEDIATION_RULE_RUNNERS.keys(),
89
        help="What type of remediations to use - openscap online one, "
90
        "or remediation done by using remediation roles "
91
        "that are saved to disk beforehand.")
92
93
    subparsers = parser.add_subparsers(dest="subparser_name",
94
                                       help="Subcommands: profile, rule, combined")
95
    subparsers.required = True
96
97
    parser_profile = subparsers.add_parser("profile",
98
                                           help=("Testing profile-based "
99
                                                 "remediation applied on already "
100
                                                 "installed machine"),
101
                                           parents=[common_parser])
102
    parser_profile.set_defaults(func=ssg_test_suite.profile.perform_profile_check)
103
    parser_profile.add_argument("target",
104
                                nargs="+",
105
                                metavar="DSPROFILE",
106
                                help=("Profiles to be tested, 'ALL' means every "
107
                                      "profile of particular benchmark will be "
108
                                      "evaluated."))
109
110
    parser_rule = subparsers.add_parser("rule",
111
                                        help=("Testing remediations of particular "
112
                                              "rule for various situations - "
113
                                              "currently not supported "
114
                                              "by openscap!"),
115
                                        parents=[common_parser])
116
    parser_rule.set_defaults(func=ssg_test_suite.rule.perform_rule_check)
117
    parser_rule.add_argument("target",
118
                             nargs="+",
119
                             metavar="RULE",
120
                             help=("Rule to be tested, 'ALL' means every "
121
                                   "rule-testing scenario will be evaluated. Each "
122
                                   "target is handled as a substring - so you can "
123
                                   "ask for subset of all rules this way. (If you "
124
                                   "type ipv6 as a target, all rules containing "
125
                                   "ipv6 within id will be performed."))
126
    parser_rule.add_argument("--debug",
127
                             dest="manual_debug",
128
                             action="store_true",
129
                             help=("If an error is encountered, all execution "
130
                                   "on the VM / container will pause to allow "
131
                                   "debugging."))
132
    parser_rule.add_argument("--dontclean",
133
                             dest="dont_clean",
134
                             action="store_true",
135
                             help="Do not remove html reports of successful runs")
136
    parser_rule.add_argument("--scenarios",
137
                             dest="scenarios_regex",
138
                             default=None,
139
                             help="Regular expression matching test scenarios to run")
140
    parser_rule.add_argument("--profile",
141
                             dest="scenarios_profile",
142
                             default=None,
143
                             help="Override the profile used for test scenarios."
144
                                  " Variable selections will be done according "
145
                                  "to this profile.")
146
147
148
    parser_combined = subparsers.add_parser("combined",
149
                                            help=("Tests all rules in a profile evaluating them "
150
                                                  "against their test scenarios."),
151
                                            parents=[common_parser])
152
    parser_combined.set_defaults(func=ssg_test_suite.combined.perform_combined_check)
153
    parser_combined.add_argument("--dontclean",
154
                                 dest="dont_clean",
155
                                 action="store_true",
156
                                 help="Do not remove html reports of successful runs")
157
    parser_combined.add_argument("--scenarios",
158
                                 dest="scenarios_regex",
159
                                 default=None,
160
                                 help="Regular expression matching test scenarios to run")
161
    parser_combined.add_argument("target",
162
                                 metavar="TARGET",
163
                                 help=("Profile whose rules are to be tested. Each rule selected "
164
                                       "in the profile will be evaluated against all its test "
165
                                       "scenarios."))
166
167
    return parser.parse_args()
168
169
170
def get_logging_dir(options):
171
    body = 'custom'
172
    if 'ALL' in options.target:
173
        body = 'ALL'
174
175
    generic_logdir_stem = "{0}-{1}".format(options.subparser_name, body)
176
177
    if options.logdir is None:
178
179
        date_string = time.strftime('%Y-%m-%d-%H%M', time.localtime())
180
        logging_dir = os.path.join(
181
            os.getcwd(), 'logs', '{0}-{1}'.format(
182
                generic_logdir_stem, date_string))
183
        logging_dir = LogHelper.find_name(logging_dir)
184
    else:
185
        logging_dir = LogHelper.find_name(options.logdir)
186
187
    return logging_dir
188
189
def _print_available_benchmarks(xccdf_ids, n_xccdf_ids):
190
    logging.info("The DataStream contains {0} Benchmarks".format(n_xccdf_ids))
191
    for i in range(0, n_xccdf_ids):
192
        logging.info("{0} - {1}".format(i, xccdf_ids[i]))
193
194
def auto_select_xccdf_id(datastream, bench_number):
195
    xccdf_ids = xml_operations.get_all_xccdf_ids_in_datastream(datastream)
196
    n_xccdf_ids = len(xccdf_ids)
197
198
    if n_xccdf_ids == 0:
199
        msg = ("The provided DataStream doesn't contain any Benchmark")
200
        raise RuntimeError(msg)
201
202
    if bench_number < 0 or bench_number >= n_xccdf_ids:
203
        _print_available_benchmarks(xccdf_ids, n_xccdf_ids)
204
        logging.info("Selected Benchmark is {0}".format(bench_number))
205
206
        msg = ("Please select a valid Benchmark number")
207
        raise RuntimeError(msg)
208
209
    if n_xccdf_ids > 1:
210
        _print_available_benchmarks(xccdf_ids, n_xccdf_ids)
211
        logging.info("Selected Benchmark is {0}".format(bench_number))
212
213
        logging.info("To select a different Benchmark, "
214
                     "use --xccdf-id-number option.")
215
216
    return xccdf_ids[bench_number]
217
218
219
def get_datastreams():
220
    ds_glob = "ssg-*-ds.xml"
221
    build_dir_path = [os.path.dirname(__file__) or ".", "..", "build"]
222
    glob_pattern = os.path.sep.join(build_dir_path + [ds_glob])
223
    datastreams = [os.path.normpath(p) for p in glob(glob_pattern)]
224
    return datastreams
225
226
227
def get_unique_datastream():
228
    datastreams = get_datastreams()
229
    if len(datastreams) == 1:
230
        return datastreams[0]
231
    msg = ("Autodetection of the datastream file is possible only when there is "
232
           "a single one in the build dir, but")
233
    if not datastreams:
234
        raise RuntimeError(msg + " there is none.")
235
    raise RuntimeError(
236
        msg + " there are {0} of them. Use the --datastream option to select "
237
        "e.g. {1}".format(len(datastreams), datastreams))
238
239
240
def normalize_passed_arguments(options):
241
    if 'ALL' in options.target:
242
        options.target = ['ALL']
243
244
    if not options.datastream:
245
        options.datastream = get_unique_datastream()
246
247
    if options.xccdf_id is None:
248
        options.xccdf_id = auto_select_xccdf_id(options.datastream,
249
                                                options.xccdf_id_number)
250
    try:
251
        bench_id = xml_operations.infer_benchmark_id_from_component_ref_id(
252
            options.datastream, options.xccdf_id)
253
        options.benchmark_id = bench_id
254
    except RuntimeError as exc:
255
        msg = "Error inferring benchmark ID from component refId: {}".format(str(exc))
256
        raise RuntimeError(msg)
257
258
    if options.docker:
259
        options.test_env = ssg_test_suite.test_env.DockerTestEnv(
260
            options.scanning_mode, options.docker)
261
        logging.info(
262
            "The base image option has been specified, "
263
            "choosing Docker-based test environment.")
264
    elif options.container:
265
        options.test_env = ssg_test_suite.test_env.PodmanTestEnv(
266
            options.scanning_mode, options.container)
267
        logging.info(
268
            "The base image option has been specified, "
269
            "choosing Podman-based test environment.")
270
    else:
271
        hypervisor, domain_name = options.libvirt
272
        options.test_env = ssg_test_suite.test_env.VMTestEnv(
273
            options.scanning_mode, hypervisor, domain_name)
274
        logging.info(
275
            "The base image option has not been specified, "
276
            "choosing libvirt-based test environment.")
277
278
    try:
279
        benchmark_cpes = xml_operations.benchmark_get_applicable_platforms(
280
            options.datastream, options.benchmark_id
281
        )
282
        options.benchmark_cpes = benchmark_cpes
283
    except RuntimeError as exc:
284
        msg = "Error inferring platform from benchmark: {}".format(str(exc))
285
        raise RuntimeError(msg)
286
287
288
def main():
289
    options = parse_args()
290
291
    log = logging.getLogger()
292
    # this is general logger level - needs to be
293
    # debug otherwise it cuts silently everything
294
    log.setLevel(logging.DEBUG)
295
296
    LogHelper.add_console_logger(log, options.loglevel)
297
298
    try:
299
        normalize_passed_arguments(options)
300
    except RuntimeError as exc:
301
        msg = "Error occurred during options normalization: {}".format(str(exc))
302
        logging.error(msg)
303
        sys.exit(1)
304
    # logging dir needs to be created based on other options
305
    # thus we have to postprocess it
306
307
    logging_dir = get_logging_dir(options)
308
309
    LogHelper.add_logging_dir(log, logging_dir)
310
311
    options.func(options)
312
313
314
if __name__ == "__main__":
315
    main()
316