utils.add_platform_rule.clusterTestFunc()   F
last analyzed

Complexity

Conditions 15

Size

Total Lines 71
Code Lines 58

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 15
eloc 58
nop 1
dl 0
loc 71
rs 2.9998
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like utils.add_platform_rule.clusterTestFunc() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/python3
2
3
import argparse
4
import subprocess
5
import sys
6
import textwrap
7
import os
8
import time
9
10
from ssg.utils import mkdir_p
11
12
13
PROG_DESC = (''' Create and test content files for Kubernetes API checks.
14
15
This script is intended to help content writers create a new application check
16
for OCP4/Kubernetes.
17
18
- The 'create' subcommand creates the initial files for a new rule and fetches
19
  the raw URL of the object in question (unless you specify the URL).
20
21
- The 'test' subcommand builds your content locally and tests directly using an
22
  openscap podman container. The scan container will test against yaml files
23
  staged under --objectdir.
24
25
- The 'cluster-test' subcommand pushes the content to your cluster, and then
26
  runs a Platform scan for your rule with compliance-operator.
27
28
Example workflow:
29
30
$ utils/add_platform_rule.py create --rule=ocp_proxy_has_ca \
31
  --type="proxies.config" --name="cluster" \
32
  --yamlpath=".spec.trustedCA.name" --match="[a-zA-Z0-9]*"
33
creating check for "/apis/config.openshift.io/v1/proxies/cluster" with yamlpath ".spec.trustedCA.name" satisfying match of "[a-zA-Z0-9]*"
34
wrote applications/openshift/ocp_proxy_has_ca/rule.yml
35
36
$ mkdir -p /tmp/apis/config.openshift.io/v1/proxies/
37
$ oc get proxies.config/cluster -o yaml > /tmp/apis/config.openshift.io/v1/proxies/cluster
38
$ utils/add_platform_rule.py test --rule=ocp_proxy_has_ca
39
testing rule ocp_proxy_has_ca locally
40
Title
41
        None
42
Rule
43
        xccdf_org.ssgproject.content_rule_ocp_proxy_has_ca
44
Ident
45
        CCE-84209-6
46
Result
47
        pass
48
49
$ utils/add_platform_rule.py cluster-test --rule=ocp_proxy_has_ca
50
testing rule ocp_proxy_has_ca in-cluster
51
deploying compliance-operator
52
pushing image build to cluster
53
waiting for cleanup from previous test run
54
output from last phase check: LAUNCHING NOT-AVAILABLE
55
output from last phase check: RUNNING NOT-AVAILABLE
56
output from last phase check: AGGREGATING NOT-AVAILABLE
57
output from last phase check: DONE COMPLIANT
58
COMPLIANT
59
60
''')
61
62
PLATFORM_RULE_DIR = 'applications/openshift'
63
OSCAP_TEST_IMAGE = 'quay.io/compliance-operator/openscap-ocp:1.3.4'
64
OSCAP_CMD_TEMPLATE = 'oscap xccdf eval --verbose %s --fetch-remote-resources --profile xccdf_org.ssgproject.content_profile_test --results-arf /tmp/report-arf.xml /content/ssg-ocp4-ds.xml'
65
PROFILE_PATH = 'products/ocp4/profiles/test.profile'
66
67
MOCK_VERSION = ('''status:
68
  versions:
69
  - name: operator
70
    version: 4.6.0-0.ci-2020-06-15-112708
71
  - name: openshift-apiserver
72
    version: 4.6.0-0.ci-2020-06-15-112708
73
''')
74
75
RULE_TEMPLATE = ('''prodtype: ocp4
76
77
title: {TITLE}
78
79
description: {DESC}
80
81
rationale: TBD
82
83
identifiers: {{}}
84
85
severity: {SEV}
86
87
warnings:
88
- general: |-
89
    {{{{{{ openshift_cluster_setting("{URL}") | indent(4) }}}}}}
90
91
template:
92
  name: yamlfile_value
93
  vars:
94
    ocp_data: "true"{ENTITY_CHECK}{CHECK_EXISTENCE}
95
    filepath: {URL}
96
    yamlpath: "{YAMLPATH}"
97
    values:
98
    - value: "{MATCH}"{CHECK_TYPE}
99
''')
100
101
102
def operation_value(value):
103
    if value:
104
        return '\n      operation: "pattern match"\n      type: "string"'
105
    else:
106
        return ''
107
108
109
def entity_value(value):
110
    if value is not None:
111
        return '\n    entity_check: "%s"' % value
112
    else:
113
        return ''
114
115
def check_existence_value(value):
116
    if value is not None:
117
        return '\n    check_existence: "%s"' % value
118
    else:
119
        return ''
120
121
122
PROFILE_TEMPLATE = ('''documentation_complete: true
123
124
title: 'Test Profile for {RULE_NAME}'
125
126
platform: ocp4
127
128
description: Test Profile
129
selections:
130
- {RULE_NAME}
131
''')
132
133
134
TEST_SCAN_TEMPLATE = ('''apiVersion: compliance.openshift.io/v1alpha1
135
kind: ComplianceScan
136
metadata:
137
  name: test
138
spec:
139
  scanType: {TYPE}
140
  profile: {PROFILE}
141
  content: ssg-ocp4-ds.xml
142
  contentImage: image-registry.openshift-image-registry.svc:5000/openshift-compliance/openscap-ocp4-ds:latest
143
  debug: true
144
''')
145
146
147
def needs_oc(func):
148
    def wrapper(args):
149
        if which('oc') is None:
150
            print('oc is required for this command.')
151
            return 1
152
153
        return func(args)
154
    return wrapper
155
156
157
def needs_working_cluster(func):
158
    def wrapper(args):
159
        ret_code, output = subprocess.getstatusoutput(
160
            'oc whoami')
161
        if ret_code != 0:
162
            print("* Error connecting to cluster")
163
            print(output)
164
            return ret_code
165
166
        return func(args)
167
    return wrapper
168
169
def which(program):
170
    fpath, fname = os.path.split(program)
171
    if fpath:
172
        if os.path.isfile(fpath) and os.access(fpath, os.X_OK):
173
            return program
174
    else:
175
        for path in os.environ["PATH"].split(os.pathsep):
176
            exe_file = os.path.join(path, program)
177
            if os.path.isfile(exe_file) and os.access(exe_file, os.X_OK):
178
                return exe_file
179
180
    return None
181
182
183
@needs_oc
184
def createFunc(args):
185
    url = args.url
186
    retries = 0
187
    namespace_flag = ''
188
    if args.namespace is not None:
189
        namespace_flag = '-n ' + args.namespace
190
    elif args.all_namespaces:
191
        namespace_flag = '-A'
192
193
    group_path = os.path.join(PLATFORM_RULE_DIR, args.group)
194
    if args.group:
195
        if not os.path.isdir(group_path):
196
            print("ERROR: The specified group '%s' doesn't exist in the '%s' directory" % (
197
                args.group, PLATFORM_RULE_DIR))
198
            return 0
199
200
    rule_path = os.path.join(group_path, args.rule)
201
    while url is None and retries < 5:
202
        retries += 1
203
        cmdstr = 'oc get %s' % (args.type)
204
205
        if args.name:
206
            cmdstr += ' ' + args.name
207
208
        cmdstr += ' %s --loglevel=6' % (namespace_flag)
209
210
        print("Running: " + cmdstr)
211
        ret_code, output = subprocess.getstatusoutput(cmdstr)
212
213
        if ret_code != 0:
214
            print('error running oc, check connection to the cluster: %d\n %s' % (
215
                ret_code, output))
216
            continue
217
218
        fetch_line = ""
219
        url_part = ""
220
        lines = output.splitlines()
221
        for line in lines:
222
            if 'GET' in line:
223
                fetch_line = line
224
                break
225
226
        if len(fetch_line) > 0:
227
            # extract the object url from the debug line
228
            full_url = fetch_line[fetch_line.index("GET"):].split(" ")[1]
229
            url_part = full_url[full_url.rfind("/api"):]
230
231
        if len(url_part) > 0 and '/api' in url_part:
232
            url = url_part
233
234
    if url is None:
235
        print('there was a problem finding the URL from the oc debug output. Hint: override this automatic check with --url')
236
        return 1
237
238
    print('* Creating check for "%s" with yamlpath "%s" satisfying match of "%s"' % (
239
        url, args.yamlpath, args.match))
240
    rule_yaml_path = os.path.join(rule_path, 'rule.yml')
241
242
    mkdir_p(rule_path)
243
    with open(rule_yaml_path, 'w') as f:
244
        f.write(RULE_TEMPLATE.format(URL=url, TITLE=args.title, SEV=args.severity, IDENT=args.identifiers,
245
                                     DESC=args.description, YAMLPATH=args.yamlpath, MATCH=args.match,
246
                                     NEGATE=str(args.negate).lower(),
247
                                     CHECK_TYPE=operation_value(args.regex),
248
                                     CHECK_EXISTENCE=check_existence_value(args.check_existence),
249
                                     ENTITY_CHECK=entity_value(args.match_entity)))
250
    print('* Wrote ' + rule_yaml_path)
251
    return 0
252
253
254
def createTestProfile(rule):
255
    # create a solo profile for rule
256
    with open(PROFILE_PATH, 'w') as f:
257
        f.write(PROFILE_TEMPLATE.format(RULE_NAME=rule))
258
259
260
@needs_oc
261
@needs_working_cluster
262
def clusterTestFunc(args):
263
264
    print('* Testing rule %s in-cluster' % args.rule)
265
266
    findout = subprocess.getoutput(
267
        "find %s -name '%s' -type d" % (PLATFORM_RULE_DIR, args.rule))
268
    if findout == "":
269
        print('ERROR: no rule for %s, run "create" first' % args.rule)
270
        return 1
271
272
    if not args.skip_deploy:
273
        subprocess.run("utils/deploy_compliance_operator.sh")
274
275
    if not args.skip_build:
276
        createTestProfile(args.rule)
277
        print('* Pushing image build to cluster')
278
        # execute the build_ds_container script
279
        buildp = subprocess.run(
280
            ['utils/build_ds_container.sh', '-P', 'ocp4', '-P', 'rhcos4'])
281
        if buildp.returncode != 0:
282
            try:
283
                os.remove(PROFILE_PATH)
284
            except OSError:
285
                pass
286
            return 1
287
288
    ret_code, _ = subprocess.getstatusoutput(
289
        'oc delete compliancescans/test')
290
    if ret_code == 0:
291
        # if previous compliancescans were actually deleted, wait a bit to allow resources to clean up.
292
        print('* Waiting for cleanup from a previous test run')
293
        time.sleep(20)
294
295
    # create a single-rule scan
296
    print("* Running scan with rule '%s'" % args.rule)
297
    profile = 'xccdf_org.ssgproject.content_profile_test'
298
    apply_cmd = ['oc', 'apply', '-f', '-']
299
    with subprocess.Popen(apply_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
300
        _, err = proc.communicate(
301
            input=TEST_SCAN_TEMPLATE.format(PROFILE=profile, TYPE=args.scantype).encode())
302
        if proc.returncode != 0:
303
            print('Error applying scan object: %s' % err)
304
            try:
305
                os.remove(PROFILE_PATH)
306
            except OSError:
307
                pass
308
            return 1
309
310
    # poll for the DONE result
311
    timeout = time.time() + 120   # A couple of minutes is generous for the platform scan.
312
    scan_result = None
313
    while True:
314
        ret_code, output = subprocess.getstatusoutput(
315
            'oc get compliancescans/test -o template="{{.status.phase}} {{.status.result}}"')
316
        if output is not None:
317
            print('> Output from last phase check: %s' % output)
318
        if output.startswith('DONE'):
319
            scan_result = output[5:]
320
            break
321
        if time.time() >= timeout:
322
            break
323
        time.sleep(2)
324
325
    if scan_result is None:
326
        print('ERROR: Timeout waiting for scan to finish')
327
        return 1
328
329
    print("* The result is '%s'" % scan_result)
330
    return 0
331
332
333
def testFunc(args):
334
    if which('podman') is None:
335
        print('podman is required')
336
        return 1
337
338
    print('testing rule %s locally' % args.rule)
339
340
    if not args.skip_build:
341
        createTestProfile(args.rule)
342
        ret_code, out = subprocess.getstatusoutput('./build_product --datastream-only ocp4')
343
        if ret_code != 0:
344
            print('build failed: %s' % out)
345
            return 1
346
347
    # mock a passing result for the implicit ocp4 version check
348
    version_dir = args.objectdir + '/apis/config.openshift.io/v1/clusteroperators'
349
    mock_version_file = os.path.join(version_dir, 'openshift-apiserver')
350
    if not os.path.exists(mock_version_file):
351
        mkdir_p(version_dir)
352
        with open(mock_version_file, 'w') as f:
353
            f.write(MOCK_VERSION)
354
355
    oscap_cmd_opts = OSCAP_CMD_TEMPLATE % (args.verbosity)
356
    pod_cmd = 'podman run -it --security-opt label=disable -v "%s:/content" -v "%s:/kubernetes-api-resources" %s %s' % (args.contentdir,
357
                                                                                                                        args.objectdir, OSCAP_TEST_IMAGE, oscap_cmd_opts)
358
    print(subprocess.getoutput(pod_cmd))
359
360
361
def main():
362
    parser = argparse.ArgumentParser(
363
        prog="add_platform_rule.py",
364
        formatter_class=argparse.RawDescriptionHelpFormatter,
365
        description=textwrap.dedent(PROG_DESC))
366
    subparser = parser.add_subparsers(
367
        dest='subcommand', title='subcommands', help='pick one')
368
    create_parser = subparser.add_parser(
369
        'create', help='Bootstrap the XML and YML files under %s for a new check.' % PLATFORM_RULE_DIR)
370
    create_parser.add_argument(
371
        '--rule', required=True, help='The name of the rule to create. Required.')
372
    create_parser.add_argument(
373
        '--group', default="", help='The group directory of the rule to create.')
374
    create_parser.add_argument(
375
        '--name', help='The name of the Kubernetes object to check.')
376
    create_parser.add_argument(
377
        '--type', required=True, help='The type of Kubernetes object, e.g., configmap. Required.')
378
    create_parser.add_argument('--yamlpath', required=True,
379
                               help='The yaml-path of the element to match against.')
380
    create_parser.add_argument(
381
        '--match', required=True, help='A string value or regex providing the matching criteria. Required')
382
    create_parser.add_argument(
383
        '--namespace', help='The namespace of the Kubernetes object (optional for cluster-scoped objects)', default=None)
384
    create_parser.add_argument(
385
        '--all-namespaces', action="store_true", help='The namespace of the Kubernetes object (optional for cluster-scoped objects)',
386
        default=False)
387
    create_parser.add_argument(
388
        '--title', help='A short description of the check.')
389
    create_parser.add_argument(
390
        '--url', help='The direct api path (metadata.selfLink) of the object, which overrides --type --name and --namespace options.')
391
    create_parser.add_argument(
392
        '--description', help='A human-readable description of the provided matching criteria.')
393
    create_parser.add_argument(
394
        '--regex', default=False, action="store_true", help='treat the --match value as a regex')
395
    create_parser.add_argument(
396
        '--match-entity', help='the entity_check value to apply, i.e., "all", "at least one", "none exist"')
397
    create_parser.add_argument(
398
        '--check-existence', help='check_existence` value for the `yamlfilecontent_test`.')
399
    create_parser.add_argument(
400
        '--negate', default=False, action="store_true", help='negate the given matching criteria (does NOT match). Default is false.')
401
    create_parser.add_argument(
402
        '--identifiers', default="TBD", help='an identifier for the rule (CCE number)')
403
    create_parser.add_argument(
404
        '--severity', default="unknown", help='the severity of the rule.')
405
    create_parser.set_defaults(func=createFunc)
406
407
    cluster_test_parser = subparser.add_parser(
408
        'cluster-test', help='Test a rule on a running OCP cluster using the compliance-operator.')
409
    cluster_test_parser.add_argument(
410
        '--rule', required=True, help='The name of the rule to test. Required.')
411
    cluster_test_parser.add_argument(
412
        '--skip-deploy', default=False, action="store_true", help='Skip deploying the compliance-operator. Default is to deploy.')
413
    cluster_test_parser.add_argument(
414
        '--skip-build', default=False, action="store_true", help='Skip building and pushing the datastream. Default is true.')
415
    cluster_test_parser.add_argument(
416
        '--scan-type', help='Type of scan to execute.', dest="scantype",
417
        default="Platform",
418
        choices=["Node", "Platform"])
419
    cluster_test_parser.set_defaults(func=clusterTestFunc)
420
421
    test_parser = subparser.add_parser(
422
        'test', help='Test a rule locally against a directory of mocked object files using podman and an oscap container.')
423
    test_parser.add_argument('--rule', required=True,
424
                             help='The name of the rule to test.')
425
    test_parser.add_argument(
426
        '--contentdir', default="./build", help='The path to the directory containing the datastream')
427
    test_parser.add_argument(
428
        '--skip-build', default=False, action="store_true", help='Skip building the datastream. Default is false.')
429
    test_parser.add_argument('--objectdir', default="/tmp",
430
                             help='The path to a directory structure of yaml objects to test against.')
431
    test_parser.add_argument('--verbosity', default="INFO",
432
                             choices=['INFO', 'DEVEL'],
433
                             help='How verbose should OpenScap be')
434
    test_parser.set_defaults(func=testFunc)
435
436
    args = parser.parse_args()
437
438
    return args.func(args)
439
440
441
if __name__ == "__main__":
442
    sys.exit(main())
443