ssg.build_sce.checks()   F
last analyzed

Complexity

Conditions 21

Size

Total Lines 138
Code Lines 76

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 462

Importance

Changes 0
Metric Value
cc 21
eloc 76
nop 5
dl 0
loc 138
ccs 0
cts 73
cp 0
crap 462
rs 0
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like ssg.build_sce.checks() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from __future__ import absolute_import
2
from __future__ import print_function
3
4
import os
5
import os.path
6
import json
7
import sys
8
9
from .build_yaml import Rule, DocumentationNotComplete
10
from .constants import (
11
    MULTI_PLATFORM_LIST, OSCAP_VALUE, datastream_namespace,
12
    xlink_namespace, XCCDF12_NS, SCE_SYSTEM
13
)
14
from .jinja import process_file_with_macros
15
from .rule_yaml import parse_prodtype
16
from .rules import get_rule_dir_id, get_rule_dir_sces, find_rule_dirs_in_paths
17
from . import utils, products
18
19
20
def load_sce_and_metadata(file_path, local_env_yaml):
21
    """
22
    For the given SCE audit file (file_path) under the specified environment
23
    (local_env_yaml), parse the file while expanding Jinja macros and read any
24
    metadata headers the file contains. Note that the last keyword of a
25
    specified type is the recorded one.
26
27
    Returns (audit_content, metadata).
28
    """
29
30
    raw_content = process_file_with_macros(file_path, local_env_yaml)
31
    return load_sce_and_metadata_parsed(raw_content)
32
33
34
def load_sce_and_metadata_parsed(raw_content):
35
    metadata = dict()
36
    sce_content = []
37
38
    keywords = ['platform', 'check-import', 'check-export', 'complex-check']
39
    for line in raw_content.split("\n"):
40
        found_metadata = False
41
        for keyword in keywords:
42
            if not line.startswith('# ' + keyword + ' = '):
43
                continue
44
45
            found_metadata = True
46
47
            # Strip off the initial comment marker
48
            _, value = line[2:].split('=', maxsplit=1)
49
            metadata[keyword] = value.strip()
50
51
        if not found_metadata:
52
            sce_content.append(line)
53
54
    if 'check-export' in metadata:
55
        # Special case for the variables exposed to the SCE script: prepend
56
        # the OSCAP_VALUE prefix to reference the variable
57
        new_variables = []
58
        for value in metadata['check-export'].split(','):
59
            k, v = value.split('=', maxsplit=1)
60
            new_variables.append(k+'='+OSCAP_VALUE+v)
61
        metadata['check-export'] = new_variables
62
63
    if 'platform' in metadata:
64
        metadata['platform'] = metadata['platform'].split(',')
65
66
    return "\n".join(sce_content), metadata
67
68
69
def _check_is_applicable_for_product(metadata, product):
70
    """
71
    Validates whether or not the specified check is applicable for this
72
    product. Different from build_ovals.py in that this operates directly
73
    on the parsed metadata and doesn't have to deal with matching XML
74
    elements.
75
    """
76
77
    if 'platform' not in metadata:
78
        return True
79
80
    product, product_version = utils.parse_name(product)
81
82
    multi_product = 'multi_platform_{0}'.format(product)
83
    if product in ['macos', 'ubuntu']:
84
        product_version = product_version[:2] + "." + product_version[2:]
85
86
    return ('multi_platform_all' in metadata['platform'] or
87
            (multi_product in metadata['platform'] and
88
             product in MULTI_PLATFORM_LIST) or
89
            (product in metadata['platform'] and
90
             product_version in metadata['platform']))
91
92
93
def _check_is_loaded(already_loaded, filename):
94
    # Right now this check doesn't look at metadata or anything
95
    # else. Eventually we might add versions to the entry or
96
    # something.
97
    return filename in already_loaded
98
99
100
def checks(env_yaml, yaml_path, sce_dirs, template_builder, output):
101
    """
102
    Walks the build system and builds all SCE checks (and metadata entry)
103
    into the output directory.
104
    """
105
    product = utils.required_key(env_yaml, "product")
106
    included_checks_count = 0
107
    reversed_dirs = sce_dirs[::-1]
108
    already_loaded = dict()
109
    local_env_yaml = dict()
110
    local_env_yaml.update(env_yaml)
111
112
    # We maintain the same search structure as build_ovals.py even though we
113
    # don't currently have any content under shared/checks/sce.
114
    product_yaml = products.Product(yaml_path)
115
    product_dir = product_yaml["product_dir"]
116
    relative_guide_dir = utils.required_key(env_yaml, "benchmark_root")
117
    guide_dir = os.path.abspath(os.path.join(product_dir, relative_guide_dir))
118
    additional_content_directories = env_yaml.get("additional_content_directories", [])
119
    add_content_dirs = [
120
        os.path.abspath(os.path.join(product_dir, rd))
121
        for rd in additional_content_directories
122
    ]
123
124
    # First walk all rules under the product. These have higher priority than any
125
    # out-of-tree SCE checks.
126
    for _dir_path in find_rule_dirs_in_paths([guide_dir] + add_content_dirs):
127
        rule_id = get_rule_dir_id(_dir_path)
128
129
        rule_path = os.path.join(_dir_path, "rule.yml")
130
        try:
131
            rule = Rule.from_yaml(rule_path, env_yaml)
132
        except DocumentationNotComplete:
133
            # Happens on non-debug builds when a rule isn't yet completed. We
134
            # don't want to build the SCE check for this rule yet so skip it
135
            # and move on.
136
            continue
137
138
        prodtypes = parse_prodtype(rule.prodtype)
139
        if prodtypes and 'all' not in prodtypes and product not in prodtypes:
140
            # The prodtype exists, isn't all and doesn't contain this current
141
            # product, so we're best to skip this rule altogether.
142
            continue
143
144
        local_env_yaml['rule_id'] = rule.id_
145
        local_env_yaml['rule_title'] = rule.title
146
        local_env_yaml['products'] = prodtypes  # default is all
147
148
        for _path in get_rule_dir_sces(_dir_path, product):
149
            # To be compatible with later checks, use the rule_id (i.e., the
150
            # value of _dir) to recreate the expected filename if this OVAL
151
            # was in a rule directory. However, note that unlike
152
            # build_oval.checks(...), we have to get this script's extension
153
            # first.
154
            _, ext = os.path.splitext(_path)
155
            filename = "{0}{1}".format(rule_id, ext)
156
157
            sce_content, metadata = load_sce_and_metadata(_path, local_env_yaml)
158
            metadata['filename'] = filename
159
160
            if not _check_is_applicable_for_product(metadata, product):
161
                continue
162
            if _check_is_loaded(already_loaded, rule_id):
163
                continue
164
165
            with open(os.path.join(output, filename), 'w') as output_file:
166
                print(sce_content, file=output_file)
167
168
            included_checks_count += 1
169
            already_loaded[rule_id] = metadata
170
171
        if rule.template:
172
            langs = template_builder.get_resolved_langs_to_generate(rule)
173
            if 'sce-bash' in langs:
174
                # Here we know the specified rule has a template and this
175
                # template actually generates (bash) SCE content. We
176
                # prioritize bespoke SCE content over templated content,
177
                # however, while we add this to our metadata, we do not
178
                # bother (yet!) with generating the SCE content. This is done
179
                # at a later time by build-scripts/build_templated_content.py.
180
                if _check_is_loaded(already_loaded, rule_id):
181
                    continue
182
183
                # While we don't _write_ it, we still need to parse SCE
184
                # metadata from the templated content. Render it internally.
185
                raw_sce_content = template_builder.get_lang_contents_for_templatable(
186
                    rule, langs['sce-bash']
187
                )
188
189
                ext = '.sh'
190
                filename = rule_id + ext
191
192
                # Load metadata and infer correct file name.
193
                sce_content, metadata = load_sce_and_metadata_parsed(raw_sce_content)
194
                metadata['filename'] = filename
195
196
                # Skip the check if it isn't applicable for this product.
197
                if not _check_is_applicable_for_product(metadata, product):
198
                    continue
199
200
                with open(os.path.join(output, filename), 'w') as output_file:
201
                    print(sce_content, file=output_file)
202
203
                # Finally, include it in our loaded content
204
                included_checks_count += 1
205
                already_loaded[rule_id] = metadata
206
207
    # Finally take any shared SCE checks and build them as well. Note that
208
    # there's no way for shorthand generation to include them if they do NOT
209
    # align with a particular rule_id, so it is suggested that the former
210
    # method be used.
211
    for sce_dir in reversed_dirs:
212
        if not os.path.isdir(sce_dir):
213
            continue
214
215
        for filename in sorted(os.listdir(sce_dir)):
216
            rule_id, _ = os.path.splitext(filename)
217
218
            sce_content, metadata = load_sce_and_metadata(filename, env_yaml)
219
            metadata['filename'] = filename
220
221
            if not _check_is_applicable_for_product(metadata, product):
222
                continue
223
            if _check_is_loaded(already_loaded, rule_id):
224
                continue
225
226
            with open(os.path.join(output, filename), 'w') as output_file:
227
                print(sce_content, file=output_file)
228
229
            included_checks_count += 1
230
            already_loaded[rule_id] = metadata
231
232
    # Finally, write out our metadata to disk so that we can reference it in
233
    # later build stages (such as during building shorthand content).
234
    metadata_path = os.path.join(output, 'metadata.json')
235
    json.dump(already_loaded, open(metadata_path, 'w'))
236
237
    return already_loaded
238
239
240
# Retrieve the SCE checks and return a list of path to each check script.
241
def collect_sce_checks(datastreamtree):
242
    checklists = datastreamtree.find(
243
        ".//{%s}checklists" % datastream_namespace)
244
    checklists_component_ref = checklists.find(
245
        "{%s}component-ref" % datastream_namespace)
246
    # The component ID is the component-ref href without leading '#'
247
    checklist_component_id = checklists_component_ref.get('{%s}href' % xlink_namespace)[1:]
248
249
    checks_xpath = str.format(
250
        ".//{{{ds_ns}}}component[@id='{cid}']/"
251
        "{{{xccdf_ns}}}Benchmark//"
252
        "{{{xccdf_ns}}}Rule/"
253
        "{{{xccdf_ns}}}check[@system='{sce_sys}']/"
254
        "{{{xccdf_ns}}}check-content-ref",
255
        ds_ns=datastream_namespace,
256
        xccdf_ns=XCCDF12_NS,
257
        cid=checklist_component_id,
258
        sce_sys=SCE_SYSTEM
259
    )
260
261
    checks = datastreamtree.findall(checks_xpath)
262
    # Extract the file paths of the SCE checks
263
    return [check.get('href') for check in checks]
264