Test Failed
Push — master ( 94c4fe...e83b9a )
by Jan
02:20 queued 14s
created

utils.add_platform_rule.entity_value()   A

Complexity

Conditions 2

Size

Total Lines 5
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
cc 2
eloc 4
nop 1
dl 0
loc 5
ccs 0
cts 4
cp 0
crap 6
rs 10
c 0
b 0
f 0
1
#!/usr/bin/python3
2
3
import argparse
4
import subprocess
5
import pathlib
6
import sys
7
import textwrap
8
import os
9
import time
10
11
PROG_DESC = (''' Create and test content files for Kubernetes API checks.
12
13
This script is intended to help content writers create a new application check
14
for OCP4/Kubernetes.
15
16
- The 'create' subcommand creates the initial files for a new rule and fetches
17
  the raw URL of the object in question (unless you specify the URL).
18
19
- The 'test' subcommand builds your content locally and tests directly using an
20
  openscap podman container. The scan container will test against yaml files
21
  staged under --objectdir.
22
23
- The 'cluster-test' subcommand pushes the content to your cluster, and then
24
  runs a Platform scan for your rule with compliance-operator.
25
26
Example workflow:
27
28
$ utils/add_platform_rule.py create --rule=ocp_proxy_has_ca \
29
  --type="proxies.config" --name="cluster" \
30
  --yamlpath=".spec.trustedCA.name" --match="[a-zA-Z0-9]*"
31
creating check for "/apis/config.openshift.io/v1/proxies/cluster" with yamlpath ".spec.trustedCA.name" satisfying match of "[a-zA-Z0-9]*"
32
wrote applications/openshift/ocp_proxy_has_ca/rule.yml
33
34
$ mkdir -p /tmp/apis/config.openshift.io/v1/proxies/
35
$ oc get proxies.config/cluster -o yaml > /tmp/apis/config.openshift.io/v1/proxies/cluster
36
$ utils/add_platform_rule.py test --rule=ocp_proxy_has_ca
37
testing rule ocp_proxy_has_ca locally
38
Title
39
        None
40
Rule
41
        xccdf_org.ssgproject.content_rule_ocp_proxy_has_ca
42
Ident
43
        CCE-84209-6
44
Result
45
        pass
46
47
$ utils/add_platform_rule.py cluster-test --rule=ocp_proxy_has_ca
48
testing rule ocp_proxy_has_ca in-cluster
49
deploying compliance-operator
50
pushing image build to cluster
51
waiting for cleanup from previous test run
52
output from last phase check: LAUNCHING NOT-AVAILABLE
53
output from last phase check: RUNNING NOT-AVAILABLE
54
output from last phase check: AGGREGATING NOT-AVAILABLE
55
output from last phase check: DONE COMPLIANT
56
COMPLIANT
57
58
''')
59
60
PLATFORM_RULE_DIR = 'applications/openshift'
61
OSCAP_TEST_IMAGE = 'quay.io/compliance-operator/openscap-ocp:1.3.4'
62
OSCAP_CMD_TEMPLATE = 'oscap xccdf eval --verbose %s --fetch-remote-resources --profile xccdf_org.ssgproject.content_profile_test --results-arf /tmp/report-arf.xml /content/ssg-ocp4-ds.xml'
63
PROFILE_PATH = 'products/ocp4/profiles/test.profile'
64
65
MOCK_VERSION = ('''status:
66
  versions:
67
  - name: operator
68
    version: 4.6.0-0.ci-2020-06-15-112708
69
  - name: openshift-apiserver
70
    version: 4.6.0-0.ci-2020-06-15-112708
71
''')
72
73
RULE_TEMPLATE = ('''prodtype: ocp4
74
75
title: {TITLE}
76
77
description: {DESC}
78
79
rationale: TBD
80
81
identifiers: {{}}
82
83
severity: {SEV}
84
85
warnings:
86
- general: |-
87
    {{{{{{ openshift_cluster_setting("{URL}") | indent(4) }}}}}}
88
89
template:
90
  name: yamlfile_value
91
  vars:
92
    ocp_data: "true"{ENTITY_CHECK}{CHECK_EXISTENCE}
93
    filepath: {URL}
94
    yamlpath: "{YAMLPATH}"
95
    values:
96
    - value: "{MATCH}"{CHECK_TYPE}
97
''')
98
99
100
def operation_value(value):
101
    if value:
102
        return '\n      operation: "pattern match"\n      type: "string"'
103
    else:
104
        return ''
105
106
107
def entity_value(value):
108
    if value is not None:
109
        return '\n    entity_check: "%s"' % value
110
    else:
111
        return ''
112
113
def check_existence_value(value):
114
    if value is not None:
115
        return '\n    check_existence: "%s"' % value
116
    else:
117
        return ''
118
119
120
PROFILE_TEMPLATE = ('''documentation_complete: true
121
122
title: 'Test Profile for {RULE_NAME}'
123
124
platform: ocp4
125
126
description: Test Profile
127
selections:
128
- {RULE_NAME}
129
''')
130
131
132
TEST_SCAN_TEMPLATE = ('''apiVersion: compliance.openshift.io/v1alpha1
133
kind: ComplianceScan
134
metadata:
135
  name: test
136
spec:
137
  scanType: Platform
138
  profile: {PROFILE}
139
  content: ssg-ocp4-ds.xml
140
  contentImage: image-registry.openshift-image-registry.svc:5000/openshift-compliance/openscap-ocp4-ds:latest
141
  debug: true
142
''')
143
144
145
def needs_oc(func):
146
    def wrapper(args):
147
        if which('oc') is None:
148
            print('oc is required for this command.')
149
            return 1
150
151
        return func(args)
152
    return wrapper
153
154
155
def needs_working_cluster(func):
156
    def wrapper(args):
157
        ret_code, output = subprocess.getstatusoutput(
158
            'oc whoami')
159
        if ret_code != 0:
160
            print("* Error connecting to cluster")
161
            print(output)
162
            return ret_code
163
164
        return func(args)
165
    return wrapper
166
167
def which(program):
168
    fpath, fname = os.path.split(program)
169
    if fpath:
170
        if os.path.isfile(fpath) and os.access(fpath, os.X_OK):
171
            return program
172
    else:
173
        for path in os.environ["PATH"].split(os.pathsep):
174
            exe_file = os.path.join(path, program)
175
            if os.path.isfile(exe_file) and os.access(exe_file, os.X_OK):
176
                return exe_file
177
178
    return None
179
180
181
@needs_oc
182
def createFunc(args):
183
    url = args.url
184
    retries = 0
185
    namespace_flag = ''
186
    if args.namespace is not None:
187
        namespace_flag = '-n ' + args.namespace
188
189
    group_path = os.path.join(PLATFORM_RULE_DIR, args.group)
190
    if args.group:
191
        if not os.path.isdir(group_path):
192
            print("ERROR: The specified group '%s' doesn't exist in the '%s' directory" % (
193
                args.group, PLATFORM_RULE_DIR))
194
            return 0
195
196
    rule_path = os.path.join(group_path, args.rule)
197
    while url is None and retries < 5:
198
        retries += 1
199
        ret_code, output = subprocess.getstatusoutput(
200
            'oc get %s/%s %s --loglevel=6' % (args.type, args.name, namespace_flag))
201
        if ret_code != 0:
202
            print('error running oc, check connection to the cluster: %d\n %s' % (
203
                ret_code, output))
204
            continue
205
206
        fetch_line = ""
207
        url_part = ""
208
        lines = output.splitlines()
209
        for line in lines:
210
            if 'GET' in line:
211
                fetch_line = line
212
                break
213
214
        if len(fetch_line) > 0:
215
            # extract the object url from the debug line
216
            full_url = fetch_line.split(" ")[5]
217
            url_part = full_url[full_url.rfind("/api"):]
218
219
        if len(url_part) > 0 and '/api' in url_part:
220
            url = url_part
221
222
    if url == None:
223
        print('there was a problem finding the URL from the oc debug output. Hint: override this automatic check with --url')
224
        return 1
225
226
    print('* Creating check for "%s" with yamlpath "%s" satisfying match of "%s"' % (
227
        url, args.yamlpath, args.match))
228
    rule_yaml_path = os.path.join(rule_path, 'rule.yml')
229
230
    pathlib.Path(rule_path).mkdir(parents=True, exist_ok=True)
231
    with open(rule_yaml_path, 'w') as f:
232
        f.write(RULE_TEMPLATE.format(URL=url, TITLE=args.title, SEV=args.severity, IDENT=args.identifiers,
233
                                     DESC=args.description, YAMLPATH=args.yamlpath, MATCH=args.match,
234
                                     NEGATE=str(args.negate).lower(),
235
                                     CHECK_TYPE=operation_value(args.regex),
236
                                     CHECK_EXISTENCE=check_existence_value(args.check_existence),
237
                                     ENTITY_CHECK=entity_value(args.match_entity)))
238
    print('* Wrote ' + rule_yaml_path)
239
    return 0
240
241
242
def createTestProfile(rule):
243
    # create a solo profile for rule
244
    with open(PROFILE_PATH, 'w') as f:
245
        f.write(PROFILE_TEMPLATE.format(RULE_NAME=rule))
246
247
248
@needs_oc
249
@needs_working_cluster
250
def clusterTestFunc(args):
251
252
    print('* Testing rule %s in-cluster' % args.rule)
253
254
    findout = subprocess.getoutput(
255
        "find %s -name '%s' -type d" % (PLATFORM_RULE_DIR, args.rule))
256
    if findout == "":
257
        print('ERROR: no rule for %s, run "create" first' % args.rule)
258
        return 1
259
260
    if not args.skip_deploy:
261
        subprocess.run("utils/deploy_compliance_operator.sh")
262
263
    if not args.skip_build:
264
        createTestProfile(args.rule)
265
        print('* Pushing image build to cluster')
266
        # execute the build_ds_container script
267
        buildp = subprocess.run(
268
            ['utils/build_ds_container.sh', '-P', 'ocp4', '-P', 'rhcos4'])
269
        if buildp.returncode != 0:
270
            try:
271
                os.remove(PROFILE_PATH)
272
            except OSError:
273
                pass
274
            return 1
275
276
    ret_code, _ = subprocess.getstatusoutput(
277
        'oc delete compliancescans/test')
278
    if ret_code == 0:
279
        # if previous compliancescans were actually deleted, wait a bit to allow resources to clean up.
280
        print('* Waiting for cleanup from a previous test run')
281
        time.sleep(20)
282
283
    # create a single-rule scan
284
    print("* Running scan with rule '%s'" % args.rule)
285
    profile = 'xccdf_org.ssgproject.content_profile_test'
286
    apply_cmd = ['oc', 'apply', '-f', '-']
287
    with subprocess.Popen(apply_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
288
        _, err = proc.communicate(
289
            input=TEST_SCAN_TEMPLATE.format(PROFILE=profile).encode())
290
        if proc.returncode != 0:
291
            print('Error applying scan object: %s' % err)
292
            try:
293
                os.remove(PROFILE_PATH)
294
            except OSError:
295
                pass
296
            return 1
297
298
    # poll for the DONE result
299
    timeout = time.time() + 120   # A couple of minutes is generous for the platform scan.
300
    scan_result = None
301
    while True:
302
        ret_code, output = subprocess.getstatusoutput(
303
            'oc get compliancescans/test -o template="{{.status.phase}} {{.status.result}}"')
304
        if output is not None:
305
            print('> Output from last phase check: %s' % output)
306
        if output.startswith('DONE'):
307
            scan_result = output[5:]
308
            break
309
        if time.time() >= timeout:
310
            break
311
        time.sleep(2)
312
313
    if scan_result == None:
314
        print('ERROR: Timeout waiting for scan to finish')
315
        return 1
316
317
    print("* The result is '%s'" % scan_result)
318
    return 0
319
320
321
def testFunc(args):
322
    if which('podman') is None:
323
        print('podman is required')
324
        return 1
325
326
    print('testing rule %s locally' % args.rule)
327
328
    if not args.skip_build:
329
        createTestProfile(args.rule)
330
        ret_code, out = subprocess.getstatusoutput('./build_product --datastream-only ocp4')
331
        if ret_code != 0:
332
            print('build failed: %s' % out)
333
            return 1
334
335
    # mock a passing result for the implicit ocp4 version check
336
    version_dir = args.objectdir + '/apis/config.openshift.io/v1/clusteroperators'
337
    if not os.path.exists(version_dir):
338
        pathlib.Path(version_dir).mkdir(parents=True, exist_ok=True)
339
        with open(version_dir + '/openshift-apiserver', 'w') as f:
340
            f.write(MOCK_VERSION)
341
342
    oscap_cmd_opts = OSCAP_CMD_TEMPLATE % (args.verbosity)
343
    pod_cmd = 'podman run -it --security-opt label=disable -v "%s:/content" -v "%s:/kubernetes-api-resources" %s %s' % (args.contentdir,
344
                                                                                                                        args.objectdir, OSCAP_TEST_IMAGE, oscap_cmd_opts)
345
    print(subprocess.getoutput(pod_cmd))
346
347
348
def main():
349
    parser = argparse.ArgumentParser(
350
        prog="add_platform_rule.py",
351
        formatter_class=argparse.RawDescriptionHelpFormatter,
352
        description=textwrap.dedent(PROG_DESC))
353
    subparser = parser.add_subparsers(
354
        dest='subcommand', title='subcommands', help='pick one')
355
    create_parser = subparser.add_parser(
356
        'create', help='Bootstrap the XML and YML files under %s for a new check.' % PLATFORM_RULE_DIR)
357
    create_parser.add_argument(
358
        '--rule', required=True, help='The name of the rule to create. Required.')
359
    create_parser.add_argument(
360
        '--group', default="", help='The group directory of the rule to create.')
361
    create_parser.add_argument(
362
        '--name', required=True, help='The name of the Kubernetes object to check. Required.')
363
    create_parser.add_argument(
364
        '--type', required=True, help='The type of Kubernetes object, e.g., configmap. Required.')
365
    create_parser.add_argument('--yamlpath', required=True,
366
                               help='The yaml-path of the element to match against.')
367
    create_parser.add_argument(
368
        '--match', required=True, help='A string value or regex providing the matching criteria. Required')
369
    create_parser.add_argument(
370
        '--namespace', help='The namespace of the Kubernetes object (optional for cluster-scoped objects)', default=None)
371
    create_parser.add_argument(
372
        '--title', help='A short description of the check.')
373
    create_parser.add_argument(
374
        '--url', help='The direct api path (metadata.selfLink) of the object, which overrides --type --name and --namespace options.')
375
    create_parser.add_argument(
376
        '--description', help='A human-readable description of the provided matching criteria.')
377
    create_parser.add_argument(
378
        '--regex', default=False, action="store_true", help='treat the --match value as a regex')
379
    create_parser.add_argument(
380
        '--match-entity', help='the entity_check value to apply, i.e., "all", "at least one", "none exist"')
381
    create_parser.add_argument(
382
        '--check-existence', help='check_existence` value for the `yamlfilecontent_test`.')
383
    create_parser.add_argument(
384
        '--negate', default=False, action="store_true", help='negate the given matching criteria (does NOT match). Default is false.')
385
    create_parser.add_argument(
386
        '--identifiers', default="TBD", help='an identifier for the rule (CCE number)')
387
    create_parser.add_argument(
388
        '--severity', default="unknown", help='the severity of the rule.')
389
    create_parser.set_defaults(func=createFunc)
390
391
    cluster_test_parser = subparser.add_parser(
392
        'cluster-test', help='Test a rule on a running OCP cluster using the compliance-operator.')
393
    cluster_test_parser.add_argument(
394
        '--rule', required=True, help='The name of the rule to test. Required.')
395
    cluster_test_parser.add_argument(
396
        '--skip-deploy', default=False, action="store_true", help='Skip deploying the compliance-operator. Default is to deploy.')
397
    cluster_test_parser.add_argument(
398
        '--skip-build', default=False, action="store_true", help='Skip building and pushing the datastream. Default is true.')
399
    cluster_test_parser.set_defaults(func=clusterTestFunc)
400
401
    test_parser = subparser.add_parser(
402
        'test', help='Test a rule locally against a directory of mocked object files using podman and an oscap container.')
403
    test_parser.add_argument('--rule', required=True,
404
                             help='The name of the rule to test.')
405
    test_parser.add_argument(
406
        '--contentdir', default="./build", help='The path to the directory containing the datastream')
407
    test_parser.add_argument(
408
        '--skip-build', default=False, action="store_true", help='Skip building the datastream. Default is false.')
409
    test_parser.add_argument('--objectdir', default="/tmp",
410
                             help='The path to a directory structure of yaml objects to test against.')
411
    test_parser.add_argument('--verbosity', default="INFO",
412
                             choices=['INFO', 'DEVEL'],
413
                             help='How verbose should OpenScap be')
414
    test_parser.set_defaults(func=testFunc)
415
416
    args = parser.parse_args()
417
418
    return args.func(args)
419
420
421
if __name__ == "__main__":
422
    sys.exit(main())
423