Passed
Branch master (53139b)
by Matěj
04:46 queued 02:30
created

test_suite.normalize_passed_arguments()   B

Complexity

Conditions 5

Size

Total Lines 34
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 27
nop 1
dl 0
loc 34
rs 8.7653
c 0
b 0
f 0
1
#!/usr/bin/env python2
2
from __future__ import print_function
3
4
import argparse
5
import logging
6
import os
7
import os.path
8
import time
9
import sys
10
11
ssg_dir = os.path.join(os.path.dirname(__file__), "..")
12
sys.path.append(ssg_dir)
13
14
from ssg_test_suite.log import LogHelper
15
import ssg_test_suite.oscap
16
import ssg_test_suite.test_env
17
import ssg_test_suite.profile
18
import ssg_test_suite.rule
19
from ssg_test_suite import xml_operations
20
21
22
def parse_args():
23
    parser = argparse.ArgumentParser()
24
25
    common_parser = argparse.ArgumentParser(add_help=False)
26
    common_parser.set_defaults(test_env=None)
27
28
    backends = common_parser.add_mutually_exclusive_group(required=True)
29
30
    backends.add_argument(
31
        "--docker", dest="docker", metavar="BASE_IMAGE",
32
        help="Use Docker test environment with this base image.")
33
34
    backends.add_argument(
35
        "--libvirt", dest="libvirt", metavar="HYPERVISOR DOMAIN", nargs=2,
36
        help="libvirt hypervisor and domain name. "
37
        "Example of a hypervisor domain name tuple: qemu:///system ssg-test-suite")
38
39
    common_parser.add_argument("--datastream",
40
                               dest="datastream",
41
                               metavar="DATASTREAM",
42
                               required=True,
43
                               help=("Path to the Source DataStream on this "
44
                                     "machine which is going to be tested"))
45
    common_parser.add_argument("--xccdf-id",
46
                               dest="xccdf_id",
47
                               metavar="REF-ID",
48
                               required=True,
49
                               help="Reference ID related to benchmark to be used."
50
                                    " Get one using 'oscap info <datastream>'.")
51
    common_parser.add_argument("--loglevel",
52
                               dest="loglevel",
53
                               metavar="LOGLEVEL",
54
                               default="INFO",
55
                               help="Default level of console output")
56
    common_parser.add_argument("--logdir",
57
                               dest="logdir",
58
                               metavar="LOGDIR",
59
                               default=None,
60
                               help="Directory to which all output is saved")
61
62
    common_parser.add_argument(
63
        "--mode",
64
        dest="scanning_mode",
65
        default="online",
66
        choices=("online", "offline"),
67
        help="What type of check to use - either "
68
        "Online check done by running oscap inside the concerned system, or "
69
        "offline check that examines the filesystem from the host "
70
        "(either may require extended privileges).")
71
72
    common_parser.add_argument(
73
        "--remediate-using",
74
        dest="remediate_using",
75
        default="oscap",
76
        choices=ssg_test_suite.oscap.REMEDIATION_RULE_RUNNERS.keys(),
77
        help="What type of remediations to use - openscap online one, "
78
        "or remediation done by using remediation roles "
79
        "that are saved to disk beforehand.")
80
81
    subparsers = parser.add_subparsers(dest="subparser_name",
82
                                       help="Subcommands: profile, rule")
83
    subparsers.required = True
84
85
    parser_profile = subparsers.add_parser("profile",
86
                                           help=("Testing profile-based "
87
                                                 "remediation applied on already "
88
                                                 "installed machine"),
89
                                           parents=[common_parser])
90
    parser_profile.set_defaults(func=ssg_test_suite.profile.perform_profile_check)
91
    parser_profile.add_argument("target",
92
                                nargs="+",
93
                                metavar="DSPROFILE",
94
                                help=("Profiles to be tested, 'ALL' means every "
95
                                      "profile of particular benchmark will be "
96
                                      "evaluated."))
97
98
    parser_rule = subparsers.add_parser("rule",
99
                                        help=("Testing remediations of particular "
100
                                              "rule for various situations - "
101
                                              "currently not supported "
102
                                              "by openscap!"),
103
                                        parents=[common_parser])
104
    parser_rule.set_defaults(func=ssg_test_suite.rule.perform_rule_check)
105
    parser_rule.add_argument("target",
106
                             nargs="+",
107
                             metavar="RULE",
108
                             help=("Rule to be tested, 'ALL' means every "
109
                                   "rule-testing scenario will be evaluated. Each "
110
                                   "target is handled as a substring - so you can "
111
                                   "ask for subset of all rules this way. (If you "
112
                                   "type ipv6 as a target, all rules containing "
113
                                   "ipv6 within id will be performed."))
114
    parser_rule.add_argument("--debug",
115
                             dest="manual_debug",
116
                             action="store_true",
117
                             help=("If an error is encountered, all execution "
118
                                   "on the VM / container will pause to allow "
119
                                   "debugging."))
120
    parser_rule.add_argument("--dontclean",
121
                             dest="dont_clean",
122
                             action="store_true",
123
                             help="Do not remove html reports of successful runs")
124
125
    return parser.parse_args()
126
127
128
def get_logging_dir(options):
129
    body = 'custom'
130
    if 'ALL' in options.target:
131
        body = 'ALL'
132
133
    generic_logdir_stem = "{0}-{1}".format(options.subparser_name, body)
134
135
    if options.logdir is None:
136
137
        date_string = time.strftime('%Y-%m-%d-%H%M', time.localtime())
138
        logging_dir = os.path.join(
139
            os.getcwd(), 'logs', '{0}-{1}'.format(
140
                generic_logdir_stem, date_string))
141
        logging_dir = LogHelper.find_name(logging_dir)
142
    else:
143
        logging_dir = LogHelper.find_name(options.logdir)
144
145
    return logging_dir
146
147
148
def normalize_passed_arguments(options):
149
    if 'ALL' in options.target:
150
        options.target = ['ALL']
151
152
    try:
153
        bench_id = xml_operations.infer_benchmark_id_from_component_ref_id(
154
            options.datastream, options.xccdf_id)
155
        options.benchmark_id = bench_id
156
    except RuntimeError as exc:
157
        msg = "Error inferring benchmark ID from component refId: {}".format(str(exc))
158
        raise RuntimeError(msg)
159
160
    if options.docker:
161
        options.test_env = ssg_test_suite.test_env.DockerTestEnv(
162
            options.scanning_mode, options.docker)
163
        logging.info(
164
            "The base image option has been specified, "
165
            "choosing Docker-based test environment.")
166
    else:
167
        hypervisor, domain_name = options.libvirt
168
        options.test_env = ssg_test_suite.test_env.VMTestEnv(
169
            options.scanning_mode, hypervisor, domain_name)
170
        logging.info(
171
            "The base image option has not been specified, "
172
            "choosing libvirt-based test environment.")
173
174
    try:
175
        benchmark_cpes = xml_operations.benchmark_get_applicable_platforms(
176
            options.datastream, options.benchmark_id
177
        )
178
        options.benchmark_cpes = benchmark_cpes
179
    except RuntimeError as exc:
180
        msg = "Error inferring platform from benchmark: {}".format(str(exc))
181
        raise RuntimeError(msg)
182
183
184
def main():
185
    options = parse_args()
186
187
    log = logging.getLogger()
188
    # this is general logger level - needs to be
189
    # debug otherwise it cuts silently everything
190
    log.setLevel(logging.DEBUG)
191
192
    LogHelper.add_console_logger(log, options.loglevel)
193
194
    try:
195
        normalize_passed_arguments(options)
196
    except RuntimeError as exc:
197
        msg = "Error occurred during options normalization: {}".format(str(exc))
198
        logging.error(msg)
199
        sys.exit(1)
200
    # logging dir needs to be created based on other options
201
    # thus we have to postprocess it
202
203
    logging_dir = get_logging_dir(options)
204
205
    LogHelper.add_logging_dir(log, logging_dir)
206
207
    options.func(options)
208
209
210
if __name__ == "__main__":
211
    main()
212