move_patches_up_to_date_to_source_data_stream_component()   F
last analyzed

Complexity

Conditions 15

Size

Total Lines 91
Code Lines 63

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 240

Importance

Changes 0
Metric Value
cc 15
eloc 63
nop 1
dl 0
loc 91
ccs 0
cts 55
cp 0
crap 240
rs 2.9998
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like compose_ds.move_patches_up_to_date_to_source_data_stream_component() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/python3
2
3
import argparse
4
import os
5
import sys
6
import time
7
import xml.etree.ElementTree as ET
8
9
from ssg.build_sce import collect_sce_checks
10
from ssg.constants import (
11
    cat_namespace, datastream_namespace, oval_namespace, sce_namespace,
12
    XCCDF12_NS, xlink_namespace)
13
import ssg.xml
14
15
try:
16
    from urllib.parse import urlparse
17
except ImportError:
18
    from urlparse import urlparse
19
20
21
ID_NS = "org.open-scap"
22
component_ref_prefix = "#scap_org.open-scap_cref_"
23
24
25
# Inspired by openscap ds_sds_mangle_filepath() function
26
def mangle_path(path):
27
    path = path.replace('/', '-')
28
    path = path.replace('@', '-')
29
    path = path.replace('~', '-')
30
    return path
31
32
33
# From the list generated by collect_sce_checks, extract the path to the check content,
34
# and embed the script into the datastream
35
def embed_sce_checks_in_datastream(datastreamtree, checklists, sce_files, refdir):
36
    for file in sce_files:
37
        path = os.path.join(refdir, file)
38
        mangled_path = mangle_path(file)
39
40
        with open(path, 'rt', encoding='utf8') as fd:
41
            sce_script_content = fd.read()
42
43
        component_id = "scap_{}_ecomp_{}".format(ID_NS, mangled_path)
44
        component = ET.SubElement(
45
            datastreamtree, '{%s}extended-component' % datastream_namespace,
46
            attrib={
47
                'id': component_id,
48
                'timestamp': get_timestamp(path)
49
            })
50
        # Append the file content
51
        script_data = ET.SubElement(component, '{%s}script' % sce_namespace)
52
        script_data.text = sce_script_content
53
54
        # Create a component reference to map the checklist to the extended component
55
        component_ref_id = "scap_{}_cref_{}".format(ID_NS, mangled_path)
56
        component_ref = ET.SubElement(
57
            checklists, '{%s}component-ref' % datastream_namespace,
58
            attrib={
59
                'id': component_ref_id,
60
                ('{%s}href' % xlink_namespace): '#' + component_id
61
            })
62
63
        # Add the component reference to the catalog of XCCDF checklists
64
        checklists_component_ref = checklists.find(
65
            "{%s}component-ref" % datastream_namespace)
66
        catalog = checklists_component_ref.find('{%s}catalog' % cat_namespace)
67
        uri = ET.SubElement(
68
            catalog, '{%s}uri' % cat_namespace,
69
            attrib={
70
                'name': file,
71
                'uri': '#' + component_ref_id
72
            })
73
74
75
def move_patches_up_to_date_to_source_data_stream_component(datastreamtree):
76
    ds_checklists = datastreamtree.find(
77
        ".//{%s}checklists" % datastream_namespace)
78
    checklists_component_ref = ds_checklists.find(
79
        "{%s}component-ref" % datastream_namespace)
80
    checklists_component_ref_id = checklists_component_ref.get('id')
81
    # The component ID is the component-ref href without leading '#'
82
    checklists_component_id = checklists_component_ref.get(
83
        '{%s}href' % xlink_namespace)[1:]
84
85
    # Locate the <xccdf:check> element of an <xccdf:Rule> with id
86
    # security_patches_up_to_date
87
    checklist_component = None
88
    oval_check = None
89
    ds_components = datastreamtree.findall(
90
        ".//{%s}component" % datastream_namespace)
91
    for ds_component in ds_components:
92
        if ds_component.get('id') == checklists_component_id:
93
            checklist_component = ds_component
94
    if checklist_component is None:
95
        # Something strange happened
96
        sys.stderr.write(
97
            "Couldn't find <component> %s referenced by <component-ref> %s" % (
98
                checklists_component_id, checklists_component_ref_id))
99
        sys.exit(1)
100
101
    rules = checklist_component.findall(".//{%s}Rule" % XCCDF12_NS)
102
    for rule in rules:
103
        if rule.get('id').endswith('rule_security_patches_up_to_date'):
104
            rule_checks = rule.findall("{%s}check" % XCCDF12_NS)
105
            for check in rule_checks:
106
                if check.get('system') == oval_namespace:
107
                    oval_check = check
108
                    break
109
110
    if oval_check is None:
111
        # The component doesn't have a security patches up to date rule
112
        # with an OVAL check
113
        return
114
115
    # SCAP 1.3 demands multi-check true if the Rules
116
    # security_patches_up_to_date is evaluated by multiple OVAL patch class
117
    # definitinos. See 3.2.4.3, SCAP 1.3 standard (NIST.SP.800-126r3)
118
    oval_check.set('multi-check', 'true')
119
120
    check_content_ref = oval_check.find('{%s}check-content-ref' % XCCDF12_NS)
121
    href_url = check_content_ref.get('href')
122
123
    # Use URL's path to define the component name and URI
124
    # Path attribute returned from urlparse contains a leading '/', when
125
    # mangling it it will get replaced by '-'. Let's strip the '/' to avoid
126
    # a sequence of "_-" in the component-ref ID.
127
    component_ref_name = mangle_path(urlparse(href_url).path[1:])
128
    component_ref_uri = component_ref_prefix + component_ref_name
129
130
    # update @href to refer the datastream component name
131
    check_content_ref.set('href', component_ref_name)
132
133
    # Add a uri refering the component in Rule's Benchmark component-ref
134
    # catalog
135
    uri_exists = False
136
    catalog = checklists_component_ref.find('{%s}catalog' % cat_namespace)
137
    uris = catalog.findall("{%s}uri" % cat_namespace)
138
    for uri in uris:
139
        if uri.get('name') == component_ref_name:
140
            uri_exists = True
141
            return
142
    if not uri_exists:
143
        uri = ET.Element('{%s}uri' % cat_namespace)
144
        uri.set('name', component_ref_name)
145
        uri.set('uri', component_ref_uri)
146
        catalog.append(uri)
147
148
    # The component-ref ID is the catalog uri without leading '#'
149
    component_ref_feed_id = component_ref_uri[1:]
150
151
    # Add the component-ref to list of datastreams' checks
152
    check_component_ref_exists = False
153
    ds_checks = datastreamtree.find(".//{%s}checks" % datastream_namespace)
154
    check_component_refs = ds_checks.findall(
155
        "{%s}component-ref" % datastream_namespace)
156
    for check_component_ref in check_component_refs:
157
        if check_component_ref.get('id') == component_ref_feed_id:
158
            check_component_ref_exists = True
159
            return
160
    if not check_component_ref_exists:
161
        component_ref_feed = ET.Element(
162
            '{%s}component-ref' % datastream_namespace)
163
        component_ref_feed.set('id', component_ref_feed_id)
164
        component_ref_feed.set('{%s}href' % xlink_namespace, href_url)
165
        ds_checks.append(component_ref_feed)
166
167
168 View Code Duplication
def parse_args():
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
169
    parser = argparse.ArgumentParser(description="Compose an SCAP source data \
170
        stream from individual SCAP components")
171
    parser.add_argument("--xccdf", help="XCCDF 1.2 checklist file name")
172
    parser.add_argument("--oval", help="OVAL file name")
173
    parser.add_argument("--ocil", help="OCIL file name")
174
    parser.add_argument("--cpe-dict", help="CPE dictionary file name")
175
    parser.add_argument("--cpe-oval", help="CPE OVAL file name")
176
    parser.add_argument("--enable-sce", action='store_true', help="Enable building sce data")
177
    parser.add_argument(
178
        "--output-12", help="Output SCAP 1.2 source data stream file name")
179
    parser.add_argument(
180
        "--output-13", required=True,
181
        help="Output SCAP 1.3 source data stream file name")
182
    return parser.parse_args()
183
184
185
def get_timestamp(file_name):
186
    source_date_epoch = os.getenv("SOURCE_DATE_EPOCH")
187
    if source_date_epoch:
188
        time_sec = float(source_date_epoch)
189
    else:
190
        time_sec = os.path.getmtime(file_name)
191
    timestamp = time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time_sec))
192
    return timestamp
193
194
195
def add_component(
196
        ds_collection, component_ref_parent, component_file_name,
197
        dependencies=None):
198
    component_id = "scap_%s_comp_%s" % (
199
        ID_NS, os.path.basename(component_file_name))
200
    component = ET.SubElement(
201
        ds_collection, "{%s}component" % datastream_namespace)
202
    component.set("id", component_id)
203
    component.set("timestamp", get_timestamp(component_file_name))
204
    component_ref = ET.SubElement(
205
        component_ref_parent, "{%s}component-ref" % datastream_namespace)
206
    component_ref_id = "scap_%s_cref_%s" % (
207
        ID_NS, os.path.basename(component_file_name))
208
    component_ref.set("id", component_ref_id)
209
    component_ref.set("{%s}href" % xlink_namespace, "#" + component_id)
210
    component_root = ET.parse(component_file_name).getroot()
211
    component.append(component_root)
212
    if dependencies:
213
        create_catalog(component_ref, dependencies)
214
215
216
def create_catalog(component_ref, dependencies):
217
    catalog = ET.SubElement(component_ref, "{%s}catalog" % cat_namespace)
218
    for dep in dependencies:
219
        uri = ET.SubElement(catalog, "{%s}uri" % cat_namespace)
220
        dep_base_name = os.path.basename(dep)
221
        uri.set("name", dep_base_name)
222
        uri.set("uri", "#scap_%s_cref_%s" % (ID_NS, dep_base_name))
223
224
225
def compose_ds(
226
        xccdf_file_name, oval_file_name, ocil_file_name,
227
        cpe_dict_file_name, cpe_oval_file_name, sce_enabled):
228
    ds_collection = ET.Element(
229
        "{%s}data-stream-collection" % datastream_namespace)
230
    name = "from_xccdf_" + os.path.basename(xccdf_file_name)
231
    ds_collection.set("id", "scap_%s_collection_%s" % (ID_NS, name))
232
    ds_collection.set("schematron-version", "1.2")
233
    ds = ET.SubElement(ds_collection, "{%s}data-stream" % datastream_namespace)
234
    ds.set("id", "scap_%s_datastream_%s" % (ID_NS, name))
235
    ds.set("scap-version", "1.2")
236
    ds.set("use-case", "OTHER")
237
    dictionaries = ET.SubElement(ds, "{%s}dictionaries" % datastream_namespace)
238
    checklists = ET.SubElement(ds, "{%s}checklists" % datastream_namespace)
239
    checks = ET.SubElement(ds, "{%s}checks" % datastream_namespace)
240
    cpe_dict_dependencies = [cpe_oval_file_name]
241
    add_component(
242
        ds_collection, dictionaries, cpe_dict_file_name, cpe_dict_dependencies)
243
    xccdf_dependencies = [oval_file_name, ocil_file_name, cpe_oval_file_name]
244
    add_component(
245
        ds_collection, checklists, xccdf_file_name, xccdf_dependencies)
246
    add_component(ds_collection, checks, oval_file_name)
247
    add_component(ds_collection, checks, ocil_file_name)
248
    add_component(ds_collection, checks, cpe_oval_file_name)
249
    if sce_enabled:
250
        sce_check_files = collect_sce_checks(ds_collection)
251
        refdir = os.path.dirname(oval_file_name)
252
        embed_sce_checks_in_datastream(ds_collection, checklists, sce_check_files, refdir)
253
    return ET.ElementTree(ds_collection)
254
255
256
def upgrade_ds_to_scap_13(ds):
257
    dsc_el = ds.getroot()
258
    dsc_el.set("schematron-version", "1.3")
259
    ds_el = ds.find("{%s}data-stream" % datastream_namespace)
260
    ds_el.set("scap-version", '1.3')
261
262
    # Move reference to remote OVAL content to a source data stream component
263
    move_patches_up_to_date_to_source_data_stream_component(ds)
264
    return ds
265
266
267
if __name__ == "__main__":
268
    args = parse_args()
269
    ssg.xml.register_namespaces()
270
    ds = compose_ds(
271
        args.xccdf, args.oval, args.ocil, args.cpe_dict, args.cpe_oval, args.enable_sce)
272
    if args.output_12:
273
        ds.write(args.output_12)
274
    ds_13 = upgrade_ds_to_scap_13(ds)
275
    ds_13.write(args.output_13)
276