1 | #!/usr/bin/python3 |
||
2 | |||
3 | import argparse |
||
4 | import os |
||
5 | import sys |
||
6 | import time |
||
7 | import xml.etree.ElementTree as ET |
||
8 | |||
9 | from ssg.build_sce import collect_sce_checks |
||
10 | from ssg.constants import ( |
||
11 | cat_namespace, datastream_namespace, oval_namespace, sce_namespace, |
||
12 | XCCDF12_NS, xlink_namespace) |
||
13 | import ssg.xml |
||
14 | |||
15 | try: |
||
16 | from urllib.parse import urlparse |
||
17 | except ImportError: |
||
18 | from urlparse import urlparse |
||
19 | |||
20 | |||
21 | ID_NS = "org.open-scap" |
||
22 | component_ref_prefix = "#scap_org.open-scap_cref_" |
||
23 | |||
24 | |||
25 | # Inspired by openscap ds_sds_mangle_filepath() function |
||
26 | def mangle_path(path): |
||
27 | path = path.replace('/', '-') |
||
28 | path = path.replace('@', '-') |
||
29 | path = path.replace('~', '-') |
||
30 | return path |
||
31 | |||
32 | |||
33 | # From the list generated by collect_sce_checks, extract the path to the check content, |
||
34 | # and embed the script into the datastream |
||
35 | def embed_sce_checks_in_datastream(datastreamtree, checklists, sce_files, refdir): |
||
36 | for file in sce_files: |
||
37 | path = os.path.join(refdir, file) |
||
38 | mangled_path = mangle_path(file) |
||
39 | |||
40 | with open(path, 'rt', encoding='utf8') as fd: |
||
41 | sce_script_content = fd.read() |
||
42 | |||
43 | component_id = "scap_{}_ecomp_{}".format(ID_NS, mangled_path) |
||
44 | component = ET.SubElement( |
||
45 | datastreamtree, '{%s}extended-component' % datastream_namespace, |
||
46 | attrib={ |
||
47 | 'id': component_id, |
||
48 | 'timestamp': get_timestamp(path) |
||
49 | }) |
||
50 | # Append the file content |
||
51 | script_data = ET.SubElement(component, '{%s}script' % sce_namespace) |
||
52 | script_data.text = sce_script_content |
||
53 | |||
54 | # Create a component reference to map the checklist to the extended component |
||
55 | component_ref_id = "scap_{}_cref_{}".format(ID_NS, mangled_path) |
||
56 | component_ref = ET.SubElement( |
||
57 | checklists, '{%s}component-ref' % datastream_namespace, |
||
58 | attrib={ |
||
59 | 'id': component_ref_id, |
||
60 | ('{%s}href' % xlink_namespace): '#' + component_id |
||
61 | }) |
||
62 | |||
63 | # Add the component reference to the catalog of XCCDF checklists |
||
64 | checklists_component_ref = checklists.find( |
||
65 | "{%s}component-ref" % datastream_namespace) |
||
66 | catalog = checklists_component_ref.find('{%s}catalog' % cat_namespace) |
||
67 | uri = ET.SubElement( |
||
68 | catalog, '{%s}uri' % cat_namespace, |
||
69 | attrib={ |
||
70 | 'name': file, |
||
71 | 'uri': '#' + component_ref_id |
||
72 | }) |
||
73 | |||
74 | |||
75 | def move_patches_up_to_date_to_source_data_stream_component(datastreamtree): |
||
76 | ds_checklists = datastreamtree.find( |
||
77 | ".//{%s}checklists" % datastream_namespace) |
||
78 | checklists_component_ref = ds_checklists.find( |
||
79 | "{%s}component-ref" % datastream_namespace) |
||
80 | checklists_component_ref_id = checklists_component_ref.get('id') |
||
81 | # The component ID is the component-ref href without leading '#' |
||
82 | checklists_component_id = checklists_component_ref.get( |
||
83 | '{%s}href' % xlink_namespace)[1:] |
||
84 | |||
85 | # Locate the <xccdf:check> element of an <xccdf:Rule> with id |
||
86 | # security_patches_up_to_date |
||
87 | checklist_component = None |
||
88 | oval_check = None |
||
89 | ds_components = datastreamtree.findall( |
||
90 | ".//{%s}component" % datastream_namespace) |
||
91 | for ds_component in ds_components: |
||
92 | if ds_component.get('id') == checklists_component_id: |
||
93 | checklist_component = ds_component |
||
94 | if checklist_component is None: |
||
95 | # Something strange happened |
||
96 | sys.stderr.write( |
||
97 | "Couldn't find <component> %s referenced by <component-ref> %s" % ( |
||
98 | checklists_component_id, checklists_component_ref_id)) |
||
99 | sys.exit(1) |
||
100 | |||
101 | rules = checklist_component.findall(".//{%s}Rule" % XCCDF12_NS) |
||
102 | for rule in rules: |
||
103 | if rule.get('id').endswith('rule_security_patches_up_to_date'): |
||
104 | rule_checks = rule.findall("{%s}check" % XCCDF12_NS) |
||
105 | for check in rule_checks: |
||
106 | if check.get('system') == oval_namespace: |
||
107 | oval_check = check |
||
108 | break |
||
109 | |||
110 | if oval_check is None: |
||
111 | # The component doesn't have a security patches up to date rule |
||
112 | # with an OVAL check |
||
113 | return |
||
114 | |||
115 | # SCAP 1.3 demands multi-check true if the Rules |
||
116 | # security_patches_up_to_date is evaluated by multiple OVAL patch class |
||
117 | # definitinos. See 3.2.4.3, SCAP 1.3 standard (NIST.SP.800-126r3) |
||
118 | oval_check.set('multi-check', 'true') |
||
119 | |||
120 | check_content_ref = oval_check.find('{%s}check-content-ref' % XCCDF12_NS) |
||
121 | href_url = check_content_ref.get('href') |
||
122 | |||
123 | # Use URL's path to define the component name and URI |
||
124 | # Path attribute returned from urlparse contains a leading '/', when |
||
125 | # mangling it it will get replaced by '-'. Let's strip the '/' to avoid |
||
126 | # a sequence of "_-" in the component-ref ID. |
||
127 | component_ref_name = mangle_path(urlparse(href_url).path[1:]) |
||
128 | component_ref_uri = component_ref_prefix + component_ref_name |
||
129 | |||
130 | # update @href to refer the datastream component name |
||
131 | check_content_ref.set('href', component_ref_name) |
||
132 | |||
133 | # Add a uri refering the component in Rule's Benchmark component-ref |
||
134 | # catalog |
||
135 | uri_exists = False |
||
136 | catalog = checklists_component_ref.find('{%s}catalog' % cat_namespace) |
||
137 | uris = catalog.findall("{%s}uri" % cat_namespace) |
||
138 | for uri in uris: |
||
139 | if uri.get('name') == component_ref_name: |
||
140 | uri_exists = True |
||
141 | return |
||
142 | if not uri_exists: |
||
143 | uri = ET.Element('{%s}uri' % cat_namespace) |
||
144 | uri.set('name', component_ref_name) |
||
145 | uri.set('uri', component_ref_uri) |
||
146 | catalog.append(uri) |
||
147 | |||
148 | # The component-ref ID is the catalog uri without leading '#' |
||
149 | component_ref_feed_id = component_ref_uri[1:] |
||
150 | |||
151 | # Add the component-ref to list of datastreams' checks |
||
152 | check_component_ref_exists = False |
||
153 | ds_checks = datastreamtree.find(".//{%s}checks" % datastream_namespace) |
||
154 | check_component_refs = ds_checks.findall( |
||
155 | "{%s}component-ref" % datastream_namespace) |
||
156 | for check_component_ref in check_component_refs: |
||
157 | if check_component_ref.get('id') == component_ref_feed_id: |
||
158 | check_component_ref_exists = True |
||
159 | return |
||
160 | if not check_component_ref_exists: |
||
161 | component_ref_feed = ET.Element( |
||
162 | '{%s}component-ref' % datastream_namespace) |
||
163 | component_ref_feed.set('id', component_ref_feed_id) |
||
164 | component_ref_feed.set('{%s}href' % xlink_namespace, href_url) |
||
165 | ds_checks.append(component_ref_feed) |
||
166 | |||
167 | |||
168 | View Code Duplication | def parse_args(): |
|
0 ignored issues
–
show
Duplication
introduced
by
![]() |
|||
169 | parser = argparse.ArgumentParser(description="Compose an SCAP source data \ |
||
170 | stream from individual SCAP components") |
||
171 | parser.add_argument("--xccdf", help="XCCDF 1.2 checklist file name") |
||
172 | parser.add_argument("--oval", help="OVAL file name") |
||
173 | parser.add_argument("--ocil", help="OCIL file name") |
||
174 | parser.add_argument("--cpe-dict", help="CPE dictionary file name") |
||
175 | parser.add_argument("--cpe-oval", help="CPE OVAL file name") |
||
176 | parser.add_argument("--enable-sce", action='store_true', help="Enable building sce data") |
||
177 | parser.add_argument( |
||
178 | "--output-12", help="Output SCAP 1.2 source data stream file name") |
||
179 | parser.add_argument( |
||
180 | "--output-13", required=True, |
||
181 | help="Output SCAP 1.3 source data stream file name") |
||
182 | return parser.parse_args() |
||
183 | |||
184 | |||
185 | def get_timestamp(file_name): |
||
186 | source_date_epoch = os.getenv("SOURCE_DATE_EPOCH") |
||
187 | if source_date_epoch: |
||
188 | time_sec = float(source_date_epoch) |
||
189 | else: |
||
190 | time_sec = os.path.getmtime(file_name) |
||
191 | timestamp = time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time_sec)) |
||
192 | return timestamp |
||
193 | |||
194 | |||
195 | def add_component( |
||
196 | ds_collection, component_ref_parent, component_file_name, |
||
197 | dependencies=None): |
||
198 | component_id = "scap_%s_comp_%s" % ( |
||
199 | ID_NS, os.path.basename(component_file_name)) |
||
200 | component = ET.SubElement( |
||
201 | ds_collection, "{%s}component" % datastream_namespace) |
||
202 | component.set("id", component_id) |
||
203 | component.set("timestamp", get_timestamp(component_file_name)) |
||
204 | component_ref = ET.SubElement( |
||
205 | component_ref_parent, "{%s}component-ref" % datastream_namespace) |
||
206 | component_ref_id = "scap_%s_cref_%s" % ( |
||
207 | ID_NS, os.path.basename(component_file_name)) |
||
208 | component_ref.set("id", component_ref_id) |
||
209 | component_ref.set("{%s}href" % xlink_namespace, "#" + component_id) |
||
210 | component_root = ET.parse(component_file_name).getroot() |
||
211 | component.append(component_root) |
||
212 | if dependencies: |
||
213 | create_catalog(component_ref, dependencies) |
||
214 | |||
215 | |||
216 | def create_catalog(component_ref, dependencies): |
||
217 | catalog = ET.SubElement(component_ref, "{%s}catalog" % cat_namespace) |
||
218 | for dep in dependencies: |
||
219 | uri = ET.SubElement(catalog, "{%s}uri" % cat_namespace) |
||
220 | dep_base_name = os.path.basename(dep) |
||
221 | uri.set("name", dep_base_name) |
||
222 | uri.set("uri", "#scap_%s_cref_%s" % (ID_NS, dep_base_name)) |
||
223 | |||
224 | |||
225 | def compose_ds( |
||
226 | xccdf_file_name, oval_file_name, ocil_file_name, |
||
227 | cpe_dict_file_name, cpe_oval_file_name, sce_enabled): |
||
228 | ds_collection = ET.Element( |
||
229 | "{%s}data-stream-collection" % datastream_namespace) |
||
230 | name = "from_xccdf_" + os.path.basename(xccdf_file_name) |
||
231 | ds_collection.set("id", "scap_%s_collection_%s" % (ID_NS, name)) |
||
232 | ds_collection.set("schematron-version", "1.2") |
||
233 | ds = ET.SubElement(ds_collection, "{%s}data-stream" % datastream_namespace) |
||
234 | ds.set("id", "scap_%s_datastream_%s" % (ID_NS, name)) |
||
235 | ds.set("scap-version", "1.2") |
||
236 | ds.set("use-case", "OTHER") |
||
237 | dictionaries = ET.SubElement(ds, "{%s}dictionaries" % datastream_namespace) |
||
238 | checklists = ET.SubElement(ds, "{%s}checklists" % datastream_namespace) |
||
239 | checks = ET.SubElement(ds, "{%s}checks" % datastream_namespace) |
||
240 | cpe_dict_dependencies = [cpe_oval_file_name] |
||
241 | add_component( |
||
242 | ds_collection, dictionaries, cpe_dict_file_name, cpe_dict_dependencies) |
||
243 | xccdf_dependencies = [oval_file_name, ocil_file_name, cpe_oval_file_name] |
||
244 | add_component( |
||
245 | ds_collection, checklists, xccdf_file_name, xccdf_dependencies) |
||
246 | add_component(ds_collection, checks, oval_file_name) |
||
247 | add_component(ds_collection, checks, ocil_file_name) |
||
248 | add_component(ds_collection, checks, cpe_oval_file_name) |
||
249 | if sce_enabled: |
||
250 | sce_check_files = collect_sce_checks(ds_collection) |
||
251 | refdir = os.path.dirname(oval_file_name) |
||
252 | embed_sce_checks_in_datastream(ds_collection, checklists, sce_check_files, refdir) |
||
253 | return ET.ElementTree(ds_collection) |
||
254 | |||
255 | |||
256 | def upgrade_ds_to_scap_13(ds): |
||
257 | dsc_el = ds.getroot() |
||
258 | dsc_el.set("schematron-version", "1.3") |
||
259 | ds_el = ds.find("{%s}data-stream" % datastream_namespace) |
||
260 | ds_el.set("scap-version", '1.3') |
||
261 | |||
262 | # Move reference to remote OVAL content to a source data stream component |
||
263 | move_patches_up_to_date_to_source_data_stream_component(ds) |
||
264 | return ds |
||
265 | |||
266 | |||
267 | if __name__ == "__main__": |
||
268 | args = parse_args() |
||
269 | ssg.xml.register_namespaces() |
||
270 | ds = compose_ds( |
||
271 | args.xccdf, args.oval, args.ocil, args.cpe_dict, args.cpe_oval, args.enable_sce) |
||
272 | if args.output_12: |
||
273 | ds.write(args.output_12) |
||
274 | ds_13 = upgrade_ds_to_scap_13(ds) |
||
275 | ds_13.write(args.output_13) |
||
276 |