1 | from __future__ import absolute_import |
||
2 | from __future__ import print_function |
||
3 | |||
4 | from copy import deepcopy |
||
5 | import collections |
||
6 | import datetime |
||
7 | import json |
||
8 | import os |
||
9 | import os.path |
||
10 | import re |
||
11 | import sys |
||
12 | import glob |
||
13 | |||
14 | |||
15 | import ssg.build_remediations |
||
16 | import ssg.components |
||
17 | from .build_cpe import CPEALLogicalTest, CPEALCheckFactRef, ProductCPEs |
||
18 | from .constants import (XCCDF12_NS, |
||
19 | OSCAP_BENCHMARK, |
||
20 | OSCAP_GROUP, |
||
21 | OSCAP_RULE, |
||
22 | OSCAP_VALUE, |
||
23 | SCE_SYSTEM, |
||
24 | cce_uri, |
||
25 | dc_namespace, |
||
26 | ocil_cs, |
||
27 | ocil_namespace, |
||
28 | oval_namespace, |
||
29 | xhtml_namespace, |
||
30 | xsi_namespace, |
||
31 | timestamp, |
||
32 | SSG_BENCHMARK_LATEST_URI, |
||
33 | SSG_PROJECT_NAME, |
||
34 | SSG_REF_URIS, |
||
35 | PREFIX_TO_NS, |
||
36 | FIX_TYPE_TO_SYSTEM |
||
37 | ) |
||
38 | from .rules import get_rule_dir_yaml, is_rule_dir |
||
39 | from .rule_yaml import parse_prodtype |
||
40 | |||
41 | from .cce import is_cce_format_valid, is_cce_value_valid |
||
42 | from .yaml import DocumentationNotComplete, open_and_macro_expand |
||
43 | from .utils import required_key, mkdir_p |
||
44 | |||
45 | from .xml import ElementTree as ET, register_namespaces, parse_file |
||
46 | import ssg.build_stig |
||
47 | |||
48 | from .entities.common import add_sub_element, make_items_product_specific, \ |
||
49 | XCCDFEntity, Templatable |
||
50 | from .entities.profile import Profile, ProfileWithInlinePolicies |
||
51 | |||
52 | |||
53 | def reorder_according_to_ordering(unordered, ordering, regex=None): |
||
54 | ordered = [] |
||
55 | if regex is None: |
||
56 | regex = "|".join(["({0})".format(item) for item in ordering]) |
||
57 | regex = re.compile(regex) |
||
58 | |||
59 | items_to_order = list(filter(regex.match, unordered)) |
||
60 | unordered = set(unordered) |
||
61 | |||
62 | for priority_type in ordering: |
||
63 | for item in items_to_order: |
||
64 | if priority_type in item and item in unordered: |
||
65 | ordered.append(item) |
||
66 | unordered.remove(item) |
||
67 | ordered.extend(sorted(unordered)) |
||
68 | return ordered |
||
69 | |||
70 | |||
71 | def add_warning_elements(element, warnings): |
||
72 | # The use of [{dict}, {dict}] in warnings is to handle the following |
||
73 | # scenario where multiple warnings have the same category which is |
||
74 | # valid in SCAP and our content: |
||
75 | # |
||
76 | # warnings: |
||
77 | # - general: Some general warning |
||
78 | # - general: Some other general warning |
||
79 | # - general: |- |
||
80 | # Some really long multiline general warning |
||
81 | # |
||
82 | # Each of the {dict} should have only one key/value pair. |
||
83 | for warning_dict in warnings: |
||
84 | warning = add_sub_element( |
||
85 | element, "warning", XCCDF12_NS, list(warning_dict.values())[0]) |
||
86 | warning.set("category", list(warning_dict.keys())[0]) |
||
87 | |||
88 | |||
89 | def add_nondata_subelements(element, subelement, attribute, attr_data): |
||
90 | """Add multiple iterations of a sublement that contains an attribute but no data |
||
91 | For example, <requires id="my_required_id"/>""" |
||
92 | for data in attr_data: |
||
93 | req = ET.SubElement(element, "{%s}%s" % (XCCDF12_NS, subelement)) |
||
94 | req.set(attribute, data) |
||
95 | |||
96 | |||
97 | def check_warnings(xccdf_structure): |
||
98 | for warning_list in xccdf_structure.warnings: |
||
99 | if len(warning_list) != 1: |
||
100 | msg = "Only one key/value pair should exist for each warnings dictionary" |
||
101 | raise ValueError(msg) |
||
102 | |||
103 | |||
104 | def add_reference_elements(element, references, ref_uri_dict): |
||
105 | for ref_type, ref_vals in references.items(): |
||
106 | for ref_val in ref_vals.split(","): |
||
107 | # This assumes that a single srg key may have items from multiple SRG types |
||
108 | if ref_type == 'srg': |
||
109 | if ref_val.startswith('SRG-OS-'): |
||
110 | ref_href = ref_uri_dict['os-srg'] |
||
111 | elif ref_val.startswith('SRG-APP-'): |
||
112 | ref_href = ref_uri_dict['app-srg'] |
||
113 | else: |
||
114 | raise ValueError("SRG {0} doesn't have a URI defined.".format(ref_val)) |
||
115 | else: |
||
116 | try: |
||
117 | ref_href = ref_uri_dict[ref_type] |
||
118 | except KeyError as exc: |
||
119 | msg = ( |
||
120 | "Error processing reference {0}: {1} in Rule {2}." |
||
121 | .format(ref_type, ref_vals, self.id_)) |
||
122 | raise ValueError(msg) |
||
123 | |||
124 | ref = ET.SubElement(element, '{%s}reference' % XCCDF12_NS) |
||
125 | ref.set("href", ref_href) |
||
126 | ref.text = ref_val |
||
127 | |||
128 | |||
129 | def add_benchmark_metadata(element, contributors_file): |
||
130 | metadata = ET.SubElement(element, "{%s}metadata" % XCCDF12_NS) |
||
131 | |||
132 | publisher = ET.SubElement(metadata, "{%s}publisher" % dc_namespace) |
||
133 | publisher.text = SSG_PROJECT_NAME |
||
134 | |||
135 | creator = ET.SubElement(metadata, "{%s}creator" % dc_namespace) |
||
136 | creator.text = SSG_PROJECT_NAME |
||
137 | |||
138 | contrib_tree = parse_file(contributors_file) |
||
139 | for c in contrib_tree.iter('contributor'): |
||
140 | contributor = ET.SubElement(metadata, "{%s}contributor" % dc_namespace) |
||
141 | contributor.text = c.text |
||
142 | |||
143 | source = ET.SubElement(metadata, "{%s}source" % dc_namespace) |
||
144 | source.text = SSG_BENCHMARK_LATEST_URI |
||
145 | |||
146 | |||
147 | class Value(XCCDFEntity): |
||
148 | """Represents XCCDF Value |
||
149 | """ |
||
150 | KEYS = dict( |
||
151 | description=lambda: "", |
||
152 | type=lambda: "", |
||
153 | operator=lambda: "equals", |
||
154 | interactive=lambda: False, |
||
155 | options=lambda: dict(), |
||
156 | warnings=lambda: list(), |
||
157 | ** XCCDFEntity.KEYS |
||
158 | ) |
||
159 | |||
160 | MANDATORY_KEYS = { |
||
161 | "title", |
||
162 | "description", |
||
163 | "type", |
||
164 | } |
||
165 | |||
166 | @classmethod |
||
167 | def process_input_dict(cls, input_contents, env_yaml, product_cpes=None): |
||
168 | input_contents["interactive"] = ( |
||
169 | input_contents.get("interactive", "false").lower() == "true") |
||
170 | |||
171 | data = super(Value, cls).process_input_dict(input_contents, env_yaml) |
||
172 | |||
173 | possible_operators = ["equals", "not equal", "greater than", |
||
174 | "less than", "greater than or equal", |
||
175 | "less than or equal", "pattern match"] |
||
176 | |||
177 | if data["operator"] not in possible_operators: |
||
178 | raise ValueError( |
||
179 | "Found an invalid operator value '%s'. " |
||
180 | "Expected one of: %s" |
||
181 | % (data["operator"], ", ".join(possible_operators)) |
||
182 | ) |
||
183 | |||
184 | return data |
||
185 | |||
186 | @classmethod |
||
187 | def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None): |
||
188 | value = super(Value, cls).from_yaml(yaml_file, env_yaml) |
||
189 | |||
190 | check_warnings(value) |
||
191 | |||
192 | return value |
||
193 | |||
194 | def to_xml_element(self): |
||
195 | value = ET.Element('{%s}Value' % XCCDF12_NS) |
||
196 | value.set('id', OSCAP_VALUE + self.id_) |
||
197 | value.set('type', self.type) |
||
198 | if self.operator != "equals": # equals is the default |
||
199 | value.set('operator', self.operator) |
||
200 | if self.interactive: # False is the default |
||
201 | value.set('interactive', 'true') |
||
202 | title = ET.SubElement(value, '{%s}title' % XCCDF12_NS) |
||
203 | title.text = self.title |
||
204 | add_sub_element(value, 'description', XCCDF12_NS, self.description) |
||
205 | add_warning_elements(value, self.warnings) |
||
206 | |||
207 | for selector, option in self.options.items(): |
||
208 | # do not confuse Value with big V with value with small v |
||
209 | # value is child element of Value |
||
210 | value_small = ET.SubElement(value, '{%s}value' % XCCDF12_NS) |
||
211 | # by XCCDF spec, default value is value without selector |
||
212 | if selector != "default": |
||
213 | value_small.set('selector', str(selector)) |
||
214 | value_small.text = str(option) |
||
215 | |||
216 | return value |
||
217 | |||
218 | |||
219 | class Benchmark(XCCDFEntity): |
||
220 | """Represents XCCDF Benchmark |
||
221 | """ |
||
222 | KEYS = dict( |
||
223 | status=lambda: "", |
||
224 | description=lambda: "", |
||
225 | notice_id=lambda: "", |
||
226 | notice_description=lambda: "", |
||
227 | front_matter=lambda: "", |
||
228 | rear_matter=lambda: "", |
||
229 | cpes=lambda: list(), |
||
230 | version=lambda: "", |
||
231 | profiles=lambda: list(), |
||
232 | values=lambda: dict(), |
||
233 | groups=lambda: dict(), |
||
234 | rules=lambda: dict(), |
||
235 | platforms=lambda: dict(), |
||
236 | product_cpe_names=lambda: list(), |
||
237 | ** XCCDFEntity.KEYS |
||
238 | ) |
||
239 | |||
240 | MANDATORY_KEYS = { |
||
241 | "title", |
||
242 | "status", |
||
243 | "description", |
||
244 | "front_matter", |
||
245 | "rear_matter", |
||
246 | } |
||
247 | |||
248 | GENERIC_FILENAME = "benchmark.yml" |
||
249 | |||
250 | def load_entities(self, rules_by_id, values_by_id, groups_by_id): |
||
251 | for rid, val in self.rules.items(): |
||
252 | if not val: |
||
253 | self.rules[rid] = rules_by_id[rid] |
||
254 | |||
255 | for vid, val in self.values.items(): |
||
256 | if not val: |
||
257 | self.values[vid] = values_by_id[vid] |
||
258 | |||
259 | for gid, val in self.groups.items(): |
||
260 | if not val: |
||
261 | self.groups[gid] = groups_by_id[gid] |
||
262 | |||
263 | @classmethod |
||
264 | def process_input_dict(cls, input_contents, env_yaml, product_cpes): |
||
265 | input_contents["front_matter"] = input_contents["front-matter"] |
||
266 | del input_contents["front-matter"] |
||
267 | input_contents["rear_matter"] = input_contents["rear-matter"] |
||
268 | del input_contents["rear-matter"] |
||
269 | |||
270 | data = super(Benchmark, cls).process_input_dict(input_contents, env_yaml, product_cpes) |
||
271 | |||
272 | notice_contents = required_key(input_contents, "notice") |
||
273 | del input_contents["notice"] |
||
274 | |||
275 | data["notice_id"] = required_key(notice_contents, "id") |
||
276 | del notice_contents["id"] |
||
277 | |||
278 | data["notice_description"] = required_key(notice_contents, "description") |
||
279 | del notice_contents["description"] |
||
280 | |||
281 | return data |
||
282 | |||
283 | def represent_as_dict(self): |
||
284 | data = super(Benchmark, self).represent_as_dict() |
||
285 | data["rear-matter"] = data["rear_matter"] |
||
286 | del data["rear_matter"] |
||
287 | |||
288 | data["front-matter"] = data["front_matter"] |
||
289 | del data["front_matter"] |
||
290 | return data |
||
291 | |||
292 | @classmethod |
||
293 | def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None): |
||
294 | benchmark = super(Benchmark, cls).from_yaml(yaml_file, env_yaml) |
||
295 | if env_yaml: |
||
296 | benchmark.product_cpe_names = product_cpes.get_product_cpe_names() |
||
297 | benchmark.product_cpes = product_cpes |
||
298 | benchmark.id_ = env_yaml["benchmark_id"] |
||
299 | benchmark.version = env_yaml["ssg_version_str"] |
||
300 | else: |
||
301 | benchmark.id_ = "product-name" |
||
302 | benchmark.version = "0.0" |
||
303 | |||
304 | return benchmark |
||
305 | |||
306 | def add_profiles_from_dir(self, dir_, env_yaml, product_cpes): |
||
307 | for dir_item in sorted(os.listdir(dir_)): |
||
308 | dir_item_path = os.path.join(dir_, dir_item) |
||
309 | if not os.path.isfile(dir_item_path): |
||
310 | continue |
||
311 | |||
312 | _, ext = os.path.splitext(os.path.basename(dir_item_path)) |
||
313 | if ext != '.profile': |
||
314 | sys.stderr.write( |
||
315 | "Encountered file '%s' while looking for profiles, " |
||
316 | "extension '%s' is unknown. Skipping..\n" |
||
317 | % (dir_item, ext) |
||
318 | ) |
||
319 | continue |
||
320 | |||
321 | try: |
||
322 | new_profile = ProfileWithInlinePolicies.from_yaml( |
||
323 | dir_item_path, env_yaml, product_cpes) |
||
324 | except DocumentationNotComplete: |
||
325 | continue |
||
326 | except Exception as exc: |
||
327 | msg = ("Error building profile from '{fname}': '{error}'" |
||
328 | .format(fname=dir_item_path, error=str(exc))) |
||
329 | raise RuntimeError(msg) |
||
330 | if new_profile is None: |
||
331 | continue |
||
332 | |||
333 | self.profiles.append(new_profile) |
||
334 | |||
335 | def unselect_empty_groups(self): |
||
336 | for p in self.profiles: |
||
337 | p.unselect_empty_groups(self) |
||
338 | |||
339 | def to_xml_element(self, env_yaml=None, product_cpes=None): |
||
340 | root = ET.Element('{%s}Benchmark' % XCCDF12_NS) |
||
341 | root.set('id', OSCAP_BENCHMARK + self.id_) |
||
342 | root.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance') |
||
343 | root.set( |
||
344 | 'xsi:schemaLocation', |
||
345 | 'http://checklists.nist.gov/xccdf/1.2 xccdf-1.2.xsd') |
||
346 | root.set('style', 'SCAP_1.2') |
||
347 | root.set('resolved', 'true') |
||
348 | root.set('xml:lang', 'en-US') |
||
349 | status = ET.SubElement(root, '{%s}status' % XCCDF12_NS) |
||
350 | status.set('date', datetime.date.today().strftime("%Y-%m-%d")) |
||
351 | status.text = self.status |
||
352 | add_sub_element(root, "title", XCCDF12_NS, self.title) |
||
353 | add_sub_element(root, "description", XCCDF12_NS, self.description) |
||
354 | notice = add_sub_element( |
||
355 | root, "notice", XCCDF12_NS, self.notice_description) |
||
356 | notice.set('id', self.notice_id) |
||
357 | add_sub_element(root, "front-matter", XCCDF12_NS, self.front_matter) |
||
358 | add_sub_element(root, "rear-matter", XCCDF12_NS, self.rear_matter) |
||
359 | # if there are no platforms, do not output platform-specification at all |
||
360 | if len(self.product_cpes.platforms) > 0: |
||
361 | cpe_platform_spec = ET.Element( |
||
362 | "{%s}platform-specification" % PREFIX_TO_NS["cpe-lang"]) |
||
363 | for platform in self.product_cpes.platforms.values(): |
||
364 | cpe_platform_spec.append(platform.to_xml_element()) |
||
365 | root.append(cpe_platform_spec) |
||
366 | |||
367 | # The Benchmark applicability is determined by the CPEs |
||
368 | # defined in the product.yml |
||
369 | for cpe_name in self.product_cpe_names: |
||
370 | plat = ET.SubElement(root, "{%s}platform" % XCCDF12_NS) |
||
371 | plat.set("idref", cpe_name) |
||
372 | |||
373 | version = ET.SubElement(root, '{%s}version' % XCCDF12_NS) |
||
374 | version.text = self.version |
||
375 | version.set('update', SSG_BENCHMARK_LATEST_URI) |
||
376 | |||
377 | contributors_file = os.path.join(os.path.dirname(__file__), "../Contributors.xml") |
||
378 | add_benchmark_metadata(root, contributors_file) |
||
379 | |||
380 | for profile in self.profiles: |
||
381 | root.append(profile.to_xml_element()) |
||
382 | |||
383 | for value in self.values.values(): |
||
384 | root.append(value.to_xml_element()) |
||
385 | |||
386 | groups_in_bench = list(self.groups.keys()) |
||
387 | priority_order = ["system", "services"] |
||
388 | groups_in_bench = reorder_according_to_ordering(groups_in_bench, priority_order) |
||
389 | |||
390 | # Make system group the first, followed by services group |
||
391 | for group_id in groups_in_bench: |
||
392 | group = self.groups.get(group_id) |
||
393 | # Products using application benchmark don't have system or services group |
||
394 | if group is not None: |
||
395 | root.append(group.to_xml_element(env_yaml)) |
||
396 | |||
397 | for rule in self.rules.values(): |
||
398 | root.append(rule.to_xml_element(env_yaml)) |
||
399 | |||
400 | return root |
||
401 | |||
402 | def to_file(self, file_name, env_yaml=None): |
||
403 | root = self.to_xml_element(env_yaml) |
||
404 | tree = ET.ElementTree(root) |
||
405 | tree.write(file_name) |
||
406 | |||
407 | def add_value(self, value): |
||
408 | if value is None: |
||
409 | return |
||
410 | self.values[value.id_] = value |
||
411 | |||
412 | # The benchmark is also considered a group, so this function signature needs to match |
||
413 | # Group()'s add_group() |
||
414 | def add_group(self, group, env_yaml=None, product_cpes=None): |
||
415 | if group is None: |
||
416 | return |
||
417 | self.groups[group.id_] = group |
||
418 | |||
419 | def add_rule(self, rule): |
||
420 | if rule is None: |
||
421 | return |
||
422 | self.rules[rule.id_] = rule |
||
423 | |||
424 | def to_xccdf(self): |
||
425 | """We can easily extend this script to generate a valid XCCDF instead |
||
426 | of SSG SHORTHAND. |
||
427 | """ |
||
428 | raise NotImplementedError |
||
429 | |||
430 | def __str__(self): |
||
431 | return self.id_ |
||
432 | |||
433 | |||
434 | class Group(XCCDFEntity): |
||
435 | """Represents XCCDF Group |
||
436 | """ |
||
437 | |||
438 | GENERIC_FILENAME = "group.yml" |
||
439 | |||
440 | KEYS = dict( |
||
441 | prodtype=lambda: "all", |
||
442 | description=lambda: "", |
||
443 | warnings=lambda: list(), |
||
444 | requires=lambda: list(), |
||
445 | conflicts=lambda: list(), |
||
446 | values=lambda: dict(), |
||
447 | groups=lambda: dict(), |
||
448 | rules=lambda: dict(), |
||
449 | platform=lambda: "", |
||
450 | platforms=lambda: set(), |
||
451 | inherited_platforms=lambda: set(), |
||
452 | cpe_platform_names=lambda: set(), |
||
453 | ** XCCDFEntity.KEYS |
||
454 | ) |
||
455 | |||
456 | MANDATORY_KEYS = { |
||
457 | "title", |
||
458 | "status", |
||
459 | "description", |
||
460 | "front_matter", |
||
461 | "rear_matter", |
||
462 | } |
||
463 | |||
464 | @classmethod |
||
465 | def process_input_dict(cls, input_contents, env_yaml, product_cpes=None): |
||
466 | data = super(Group, cls).process_input_dict(input_contents, env_yaml, product_cpes) |
||
467 | if data["rules"]: |
||
468 | rule_ids = data["rules"] |
||
469 | data["rules"] = {rid: None for rid in rule_ids} |
||
470 | |||
471 | if data["groups"]: |
||
472 | group_ids = data["groups"] |
||
473 | data["groups"] = {gid: None for gid in group_ids} |
||
474 | |||
475 | if data["values"]: |
||
476 | value_ids = data["values"] |
||
477 | data["values"] = {vid: None for vid in value_ids} |
||
478 | |||
479 | if data["platform"]: |
||
480 | data["platforms"].add(data["platform"]) |
||
481 | |||
482 | # parse platform definition and get CPEAL platform |
||
483 | # if cpe_platform_names not already defined |
||
484 | if data["platforms"] and not data["cpe_platform_names"]: |
||
485 | for platform in data["platforms"]: |
||
486 | cpe_platform = Platform.from_text(platform, product_cpes) |
||
487 | cpe_platform = add_platform_if_not_defined(cpe_platform, product_cpes) |
||
488 | data["cpe_platform_names"].add(cpe_platform.id_) |
||
489 | return data |
||
490 | |||
491 | def load_entities(self, rules_by_id, values_by_id, groups_by_id): |
||
492 | for rid, val in self.rules.items(): |
||
493 | if not val: |
||
494 | self.rules[rid] = rules_by_id[rid] |
||
495 | |||
496 | for vid, val in self.values.items(): |
||
497 | if not val: |
||
498 | self.values[vid] = values_by_id[vid] |
||
499 | |||
500 | for gid in list(self.groups): |
||
501 | val = self.groups.get(gid, None) |
||
502 | if not val: |
||
503 | try: |
||
504 | self.groups[gid] = groups_by_id[gid] |
||
505 | except KeyError: |
||
506 | # Add only the groups we have compiled and loaded |
||
507 | del self.groups[gid] |
||
508 | pass |
||
509 | |||
510 | def represent_as_dict(self): |
||
511 | yaml_contents = super(Group, self).represent_as_dict() |
||
512 | |||
513 | if self.rules: |
||
514 | yaml_contents["rules"] = sorted(list(self.rules.keys())) |
||
515 | if self.groups: |
||
516 | yaml_contents["groups"] = sorted(list(self.groups.keys())) |
||
517 | if self.values: |
||
518 | yaml_contents["values"] = sorted(list(self.values.keys())) |
||
519 | |||
520 | return yaml_contents |
||
521 | |||
522 | def to_xml_element(self, env_yaml=None): |
||
523 | group = ET.Element('{%s}Group' % XCCDF12_NS) |
||
524 | group.set('id', OSCAP_GROUP + self.id_) |
||
525 | title = ET.SubElement(group, '{%s}title' % XCCDF12_NS) |
||
526 | title.text = self.title |
||
527 | add_sub_element(group, 'description', XCCDF12_NS, self.description) |
||
528 | add_warning_elements(group, self.warnings) |
||
529 | |||
530 | # This is where references should be put if there are any |
||
531 | # This is where rationale should be put if there are any |
||
532 | |||
533 | for cpe_platform_name in self.cpe_platform_names: |
||
534 | platform_el = ET.SubElement(group, "{%s}platform" % XCCDF12_NS) |
||
535 | platform_el.set("idref", "#"+cpe_platform_name) |
||
536 | |||
537 | add_nondata_subelements( |
||
538 | group, "requires", "idref", |
||
539 | list(map(lambda x: OSCAP_GROUP + x, self.requires))) |
||
540 | add_nondata_subelements( |
||
541 | group, "conflicts", "idref", |
||
542 | list(map(lambda x: OSCAP_GROUP + x, self.conflicts))) |
||
543 | for _value in self.values.values(): |
||
544 | if _value is not None: |
||
545 | group.append(_value.to_xml_element()) |
||
546 | |||
547 | # Rules that install or remove packages affect remediation |
||
548 | # of other rules. |
||
549 | # When packages installed/removed rules come first: |
||
550 | # The Rules are ordered in more logical way, and |
||
551 | # remediation order is natural, first the package is installed, then configured. |
||
552 | rules_in_group = list(self.rules.keys()) |
||
553 | regex = (r'(package_.*_(installed|removed))|' + |
||
554 | r'(service_.*_(enabled|disabled))|' + |
||
555 | r'install_smartcard_packages|' + |
||
556 | r'sshd_set_keepalive(_0)?|' + |
||
557 | r'sshd_set_idle_timeout$') |
||
558 | priority_order = ["enable_authselect", "installed", "install_smartcard_packages", "removed", |
||
559 | "enabled", "disabled", "sshd_set_keepalive_0", |
||
560 | "sshd_set_keepalive", "sshd_set_idle_timeout"] |
||
561 | rules_in_group = reorder_according_to_ordering(rules_in_group, priority_order, regex) |
||
562 | |||
563 | # Add rules in priority order, first all packages installed, then removed, |
||
564 | # followed by services enabled, then disabled |
||
565 | for rule_id in rules_in_group: |
||
566 | rule = self.rules.get(rule_id) |
||
567 | if rule is not None: |
||
568 | group.append(rule.to_xml_element(env_yaml)) |
||
569 | |||
570 | # Add the sub groups after any current level group rules. |
||
571 | # As package installed/removed and service enabled/disabled rules are usuallly in |
||
572 | # top level group, this ensures groups that further configure a package or service |
||
573 | # are after rules that install or remove it. |
||
574 | groups_in_group = list(self.groups.keys()) |
||
575 | priority_order = [ |
||
576 | # Make sure rpm_verify_(hashes|permissions|ownership) are run before any other rule. |
||
577 | # Due to conflicts between rules rpm_verify_* rules and any rule that configures |
||
578 | # stricter settings, like file_permissions_grub2_cfg and sudo_dedicated_group, |
||
579 | # the rules deviating from the system default should be evaluated later. |
||
580 | # So that in the end the system has contents, permissions and ownership reset, and |
||
581 | # any deviations or stricter settings are applied by the rules in the profile. |
||
582 | "software", "integrity", "integrity-software", "rpm_verification", |
||
583 | |||
584 | # The account group has to precede audit group because |
||
585 | # the rule package_screen_installed is desired to be executed before the rule |
||
586 | # audit_rules_privileged_commands, othervise the rule |
||
587 | # does not catch newly installed screen binary during remediation |
||
588 | # and report fail |
||
589 | "accounts", "auditing", |
||
590 | |||
591 | |||
592 | # The FIPS group should come before Crypto, |
||
593 | # if we want to set a different (stricter) Crypto Policy than FIPS. |
||
594 | "fips", "crypto", |
||
595 | |||
596 | # The firewalld_activation must come before ruleset_modifications, othervise |
||
597 | # remediations for ruleset_modifications won't work |
||
598 | "firewalld_activation", "ruleset_modifications", |
||
599 | |||
600 | # Rules from group disabling_ipv6 must precede rules from configuring_ipv6, |
||
601 | # otherwise the remediation prints error although it is successful |
||
602 | "disabling_ipv6", "configuring_ipv6" |
||
603 | ] |
||
604 | groups_in_group = reorder_according_to_ordering(groups_in_group, priority_order) |
||
605 | for group_id in groups_in_group: |
||
606 | _group = self.groups[group_id] |
||
607 | if _group is not None: |
||
608 | group.append(_group.to_xml_element(env_yaml)) |
||
609 | |||
610 | return group |
||
611 | |||
612 | def add_value(self, value): |
||
613 | if value is None: |
||
614 | return |
||
615 | self.values[value.id_] = value |
||
616 | |||
617 | def add_group(self, group, env_yaml=None, product_cpes=None): |
||
618 | self._add_child(group, self.groups, env_yaml, product_cpes) |
||
619 | |||
620 | def add_rule(self, rule, env_yaml=None, product_cpes=None): |
||
621 | self._add_child(rule, self.rules, env_yaml, product_cpes) |
||
622 | if env_yaml: |
||
623 | for platform in rule.inherited_platforms: |
||
624 | cpe_platform = Platform.from_text(platform, product_cpes) |
||
625 | cpe_platform = add_platform_if_not_defined(cpe_platform, product_cpes) |
||
626 | rule.inherited_cpe_platform_names.add(cpe_platform.id_) |
||
627 | |||
628 | def _add_child(self, child, childs, env_yaml=None, product_cpes=None): |
||
629 | if child is None: |
||
630 | return |
||
631 | child.inherited_platforms.update(self.platforms, self.inherited_platforms) |
||
632 | childs[child.id_] = child |
||
633 | |||
634 | def __str__(self): |
||
635 | return self.id_ |
||
636 | |||
637 | |||
638 | def noop_rule_filterfunc(rule): |
||
639 | return True |
||
640 | |||
641 | |||
642 | def rule_filter_from_def(filterdef): |
||
643 | if filterdef is None or filterdef == "": |
||
644 | return noop_rule_filterfunc |
||
645 | |||
646 | def filterfunc(rule): |
||
647 | # Remove globals for security and only expose |
||
648 | # variables relevant to the rule |
||
649 | return eval(filterdef, {"__builtins__": None}, rule.__dict__) |
||
650 | return filterfunc |
||
651 | |||
652 | |||
653 | class Rule(XCCDFEntity, Templatable): |
||
654 | """Represents XCCDF Rule |
||
655 | """ |
||
656 | KEYS = dict( |
||
657 | prodtype=lambda: "all", |
||
658 | description=lambda: "", |
||
659 | rationale=lambda: "", |
||
660 | severity=lambda: "", |
||
661 | references=lambda: dict(), |
||
662 | components=lambda: list(), |
||
663 | identifiers=lambda: dict(), |
||
664 | ocil_clause=lambda: None, |
||
665 | ocil=lambda: None, |
||
666 | oval_external_content=lambda: None, |
||
667 | fixtext=lambda: "", |
||
668 | checktext=lambda: "", |
||
669 | vuldiscussion=lambda: "", |
||
670 | srg_requirement=lambda: "", |
||
671 | warnings=lambda: list(), |
||
672 | conflicts=lambda: list(), |
||
673 | requires=lambda: list(), |
||
674 | policy_specific_content=lambda: dict(), |
||
675 | platform=lambda: None, |
||
676 | platforms=lambda: set(), |
||
677 | sce_metadata=lambda: dict(), |
||
678 | inherited_platforms=lambda: set(), |
||
679 | cpe_platform_names=lambda: set(), |
||
680 | inherited_cpe_platform_names=lambda: set(), |
||
681 | bash_conditional=lambda: None, |
||
682 | fixes=lambda: dict(), |
||
683 | **XCCDFEntity.KEYS |
||
684 | ) |
||
685 | KEYS.update(**Templatable.KEYS) |
||
686 | |||
687 | MANDATORY_KEYS = { |
||
688 | "title", |
||
689 | "description", |
||
690 | "rationale", |
||
691 | "severity", |
||
692 | } |
||
693 | |||
694 | GENERIC_FILENAME = "rule.yml" |
||
695 | ID_LABEL = "rule_id" |
||
696 | |||
697 | PRODUCT_REFERENCES = ("stigid", "cis",) |
||
698 | |||
699 | def __init__(self, id_): |
||
700 | super(Rule, self).__init__(id_) |
||
701 | self.sce_metadata = None |
||
702 | |||
703 | def __deepcopy__(self, memo): |
||
704 | cls = self.__class__ |
||
705 | result = cls.__new__(cls) |
||
706 | memo[id(self)] = result |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
![]() |
|||
707 | for k, v in self.__dict__.items(): |
||
708 | # These are difficult to deep copy, so let's just re-use them. |
||
709 | if k != "template" and k != "local_env_yaml": |
||
710 | setattr(result, k, deepcopy(v, memo)) |
||
711 | else: |
||
712 | setattr(result, k, v) |
||
713 | return result |
||
714 | |||
715 | @classmethod |
||
716 | def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None, sce_metadata=None): |
||
717 | rule = super(Rule, cls).from_yaml(yaml_file, env_yaml, product_cpes) |
||
718 | |||
719 | # platforms are read as list from the yaml file |
||
720 | # we need them to convert to set again |
||
721 | rule.platforms = set(rule.platforms) |
||
722 | |||
723 | # rule.platforms.update(set(rule.inherited_platforms)) |
||
724 | |||
725 | check_warnings(rule) |
||
726 | |||
727 | # ensure that content of rule.platform is in rule.platforms as |
||
728 | # well |
||
729 | if rule.platform is not None: |
||
730 | rule.platforms.add(rule.platform) |
||
731 | |||
732 | # Convert the platform names to CPE names |
||
733 | # But only do it if an env_yaml was specified (otherwise there would be no product CPEs |
||
734 | # to lookup), and the rule's prodtype matches the product being built |
||
735 | # also if the rule already has cpe_platform_names specified (compiled rule) |
||
736 | # do not evaluate platforms again |
||
737 | if env_yaml and ( |
||
738 | env_yaml["product"] in parse_prodtype(rule.prodtype) |
||
739 | or rule.prodtype == "all") and ( |
||
740 | product_cpes and not rule.cpe_platform_names): |
||
741 | # parse platform definition and get CPEAL platform |
||
742 | for platform in rule.platforms: |
||
743 | cpe_platform = Platform.from_text(platform, product_cpes) |
||
744 | cpe_platform = add_platform_if_not_defined(cpe_platform, product_cpes) |
||
745 | rule.cpe_platform_names.add(cpe_platform.id_) |
||
746 | # Only load policy specific content if rule doesn't have it defined yet |
||
747 | if not rule.policy_specific_content: |
||
748 | rule.load_policy_specific_content(yaml_file, env_yaml) |
||
749 | |||
750 | if sce_metadata and rule.id_ in sce_metadata: |
||
751 | rule.sce_metadata = sce_metadata[rule.id_] |
||
752 | rule.sce_metadata["relative_path"] = os.path.join( |
||
753 | env_yaml["product"], "checks/sce", rule.sce_metadata['filename']) |
||
754 | |||
755 | rule.validate_prodtype(yaml_file) |
||
756 | rule.validate_identifiers(yaml_file) |
||
757 | rule.validate_references(yaml_file) |
||
758 | return rule |
||
759 | |||
760 | def _verify_disa_cci_format(self): |
||
761 | cci_id = self.references.get("disa", None) |
||
762 | if not cci_id: |
||
763 | return |
||
764 | cci_ex = re.compile(r'^CCI-[0-9]{6}$') |
||
765 | for cci in cci_id.split(","): |
||
766 | if not cci_ex.match(cci): |
||
767 | raise ValueError("CCI '{}' is in the wrong format! " |
||
768 | "Format should be similar to: " |
||
769 | "CCI-XXXXXX".format(cci)) |
||
770 | self.references["disa"] = cci_id |
||
771 | |||
772 | def normalize(self, product): |
||
773 | try: |
||
774 | self.make_refs_and_identifiers_product_specific(product) |
||
775 | self.make_template_product_specific(product) |
||
776 | except Exception as exc: |
||
777 | msg = ( |
||
778 | "Error normalizing '{rule}': {msg}" |
||
779 | .format(rule=self.id_, msg=str(exc)) |
||
780 | ) |
||
781 | raise RuntimeError(msg) |
||
782 | |||
783 | def add_stig_references(self, stig_references): |
||
784 | stig_id = self.references.get("stigid", None) |
||
785 | if not stig_id: |
||
786 | return |
||
787 | |||
788 | references = [] |
||
789 | for id in stig_id.split(","): |
||
790 | reference = stig_references.get(id, None) |
||
791 | if not reference: |
||
792 | continue |
||
793 | references.append(reference) |
||
794 | |||
795 | if references: |
||
796 | self.references["stigref"] = ",".join(references) |
||
797 | |||
798 | def _get_product_only_references(self): |
||
799 | product_references = dict() |
||
800 | |||
801 | for ref in Rule.PRODUCT_REFERENCES: |
||
802 | start = "{0}@".format(ref) |
||
803 | for gref, gval in self.references.items(): |
||
804 | if ref == gref or gref.startswith(start): |
||
805 | product_references[gref] = gval |
||
806 | return product_references |
||
807 | |||
808 | def find_policy_specific_content(self, rule_root): |
||
809 | policy_specific_dir = os.path.join(rule_root, "policy") |
||
810 | policy_directories = glob.glob(os.path.join(policy_specific_dir, "*")) |
||
811 | filenames = set() |
||
812 | for pdir in policy_directories: |
||
813 | policy_files = glob.glob(os.path.join(pdir, "*.yml")) |
||
814 | filenames.update(set(policy_files)) |
||
815 | return filenames |
||
816 | |||
817 | def triage_policy_specific_content(self, product_name, filenames): |
||
818 | product_dot_yml = product_name + ".yml" |
||
819 | filename_by_policy = dict() |
||
820 | for fname in filenames: |
||
821 | policy = os.path.basename(os.path.dirname(fname)) |
||
822 | filename_appropriate_for_storage = ( |
||
823 | fname.endswith(product_dot_yml) and product_name |
||
824 | or fname.endswith("shared.yml") and policy not in filename_by_policy) |
||
825 | if (filename_appropriate_for_storage): |
||
826 | filename_by_policy[policy] = fname |
||
827 | return filename_by_policy |
||
828 | |||
829 | def read_policy_specific_content_file(self, env_yaml, filename): |
||
830 | yaml_data = open_and_macro_expand(filename, env_yaml) |
||
831 | return yaml_data |
||
832 | |||
833 | def read_policy_specific_content(self, env_yaml, files): |
||
834 | keys = dict() |
||
835 | if env_yaml: |
||
836 | product = env_yaml["product"] |
||
837 | else: |
||
838 | product = "" |
||
839 | filename_by_policy = self.triage_policy_specific_content(product, files) |
||
840 | for p, f in filename_by_policy.items(): |
||
841 | yaml_data = self.read_policy_specific_content_file(env_yaml, f) |
||
842 | keys[p] = yaml_data |
||
843 | return keys |
||
844 | |||
845 | def load_policy_specific_content(self, rule_filename, env_yaml): |
||
846 | rule_root = os.path.dirname(rule_filename) |
||
847 | policy_specific_content_files = self.find_policy_specific_content(rule_root) |
||
848 | policy_specific_content = dict() |
||
849 | if policy_specific_content_files: |
||
850 | policy_specific_content = self.read_policy_specific_content( |
||
851 | env_yaml, policy_specific_content_files) |
||
852 | self.policy_specific_content = policy_specific_content |
||
853 | |||
854 | def get_template_context(self, env_yaml): |
||
855 | ctx = super(Rule, self).get_template_context(env_yaml) |
||
856 | if self.identifiers: |
||
857 | ctx["cce_identifiers"] = self.identifiers |
||
858 | return ctx |
||
859 | |||
860 | def make_refs_and_identifiers_product_specific(self, product): |
||
861 | product_suffix = "@{0}".format(product) |
||
862 | |||
863 | product_references = self._get_product_only_references() |
||
864 | general_references = self.references.copy() |
||
865 | for todel in product_references: |
||
866 | general_references.pop(todel) |
||
867 | for ref in Rule.PRODUCT_REFERENCES: |
||
868 | if ref in general_references: |
||
869 | msg = "Unexpected reference identifier ({0}) without " |
||
870 | msg += "product qualifier ({0}@{1}) while building rule " |
||
871 | msg += "{2}" |
||
872 | msg = msg.format(ref, product, self.id_) |
||
873 | raise ValueError(msg) |
||
874 | |||
875 | to_set = dict( |
||
876 | identifiers=(self.identifiers, False), |
||
877 | general_references=(general_references, True), |
||
878 | product_references=(product_references, False), |
||
879 | ) |
||
880 | for name, (dic, allow_overwrites) in to_set.items(): |
||
881 | try: |
||
882 | new_items = make_items_product_specific( |
||
883 | dic, product_suffix, allow_overwrites) |
||
884 | except ValueError as exc: |
||
885 | msg = ( |
||
886 | "Error processing {what} for rule '{rid}': {msg}" |
||
887 | .format(what=name, rid=self.id_, msg=str(exc)) |
||
888 | ) |
||
889 | raise ValueError(msg) |
||
890 | dic.clear() |
||
891 | dic.update(new_items) |
||
892 | |||
893 | self.references = general_references |
||
894 | self._verify_disa_cci_format() |
||
895 | self.references.update(product_references) |
||
896 | |||
897 | def validate_identifiers(self, yaml_file): |
||
898 | if self.identifiers is None: |
||
899 | raise ValueError("Empty identifier section in file %s" % yaml_file) |
||
900 | |||
901 | # Validate all identifiers are non-empty: |
||
902 | for ident_type, ident_val in self.identifiers.items(): |
||
903 | if not isinstance(ident_type, str) or not isinstance(ident_val, str): |
||
904 | raise ValueError("Identifiers and values must be strings: %s in file %s" |
||
905 | % (ident_type, yaml_file)) |
||
906 | if ident_val.strip() == "": |
||
907 | raise ValueError("Identifiers must not be empty: %s in file %s" |
||
908 | % (ident_type, yaml_file)) |
||
909 | if ident_type[0:3] == 'cce': |
||
910 | if not is_cce_format_valid(ident_val): |
||
911 | raise ValueError("CCE Identifier format must be valid: invalid format '%s' for CEE '%s'" |
||
912 | " in file '%s'" % (ident_val, ident_type, yaml_file)) |
||
913 | if not is_cce_value_valid("CCE-" + ident_val): |
||
914 | raise ValueError("CCE Identifier value is not a valid checksum: invalid value '%s' for CEE '%s'" |
||
915 | " in file '%s'" % (ident_val, ident_type, yaml_file)) |
||
916 | |||
917 | def validate_references(self, yaml_file): |
||
918 | if self.references is None: |
||
919 | raise ValueError("Empty references section in file %s" % yaml_file) |
||
920 | |||
921 | for ref_type, ref_val in self.references.items(): |
||
922 | if not isinstance(ref_type, str) or not isinstance(ref_val, str): |
||
923 | raise ValueError("References and values must be strings: %s in file %s" |
||
924 | % (ref_type, yaml_file)) |
||
925 | if ref_val.strip() == "": |
||
926 | raise ValueError("References must not be empty: %s in file %s" |
||
927 | % (ref_type, yaml_file)) |
||
928 | |||
929 | for ref_type, ref_val in self.references.items(): |
||
930 | for ref in ref_val.split(","): |
||
931 | if ref.strip() != ref: |
||
932 | msg = ( |
||
933 | "Comma-separated '{ref_type}' reference " |
||
934 | "in {yaml_file} contains whitespace." |
||
935 | .format(ref_type=ref_type, yaml_file=yaml_file)) |
||
936 | raise ValueError(msg) |
||
937 | |||
938 | def validate_prodtype(self, yaml_file): |
||
939 | for ptype in self.prodtype.split(","): |
||
940 | if ptype.strip() != ptype: |
||
941 | msg = ( |
||
942 | "Comma-separated '{prodtype}' prodtype " |
||
943 | "in {yaml_file} contains whitespace." |
||
944 | .format(prodtype=self.prodtype, yaml_file=yaml_file)) |
||
945 | raise ValueError(msg) |
||
946 | |||
947 | def add_fixes(self, fixes): |
||
948 | self.fixes = fixes |
||
949 | |||
950 | def _add_fixes_elements(self, rule_el): |
||
951 | for fix_type, fix in self.fixes.items(): |
||
952 | fix_el = ET.SubElement(rule_el, "{%s}fix" % XCCDF12_NS) |
||
953 | fix_el.set("system", FIX_TYPE_TO_SYSTEM[fix_type]) |
||
954 | fix_el.set("id", self.id_) |
||
955 | fix_contents, config = fix |
||
956 | for key in ssg.build_remediations.REMEDIATION_ELM_KEYS: |
||
957 | if config[key]: |
||
958 | fix_el.set(key, config[key]) |
||
959 | fix_el.text = fix_contents + "\n" |
||
960 | # Expand shell variables and remediation functions |
||
961 | # into corresponding XCCDF <sub> elements |
||
962 | ssg.build_remediations.expand_xccdf_subs(fix_el, fix_type) |
||
963 | |||
964 | def to_xml_element(self, env_yaml=None): |
||
965 | rule = ET.Element('{%s}Rule' % XCCDF12_NS) |
||
966 | rule.set('selected', 'false') |
||
967 | rule.set('id', OSCAP_RULE + self.id_) |
||
968 | rule.set('severity', self.severity) |
||
969 | add_sub_element(rule, 'title', XCCDF12_NS, self.title) |
||
970 | add_sub_element(rule, 'description', XCCDF12_NS, self.description) |
||
971 | add_warning_elements(rule, self.warnings) |
||
972 | |||
973 | if env_yaml: |
||
974 | ref_uri_dict = env_yaml['reference_uris'] |
||
975 | else: |
||
976 | ref_uri_dict = SSG_REF_URIS |
||
977 | add_reference_elements(rule, self.references, ref_uri_dict) |
||
978 | |||
979 | add_sub_element(rule, 'rationale', XCCDF12_NS, self.rationale) |
||
980 | |||
981 | for cpe_platform_name in sorted(self.cpe_platform_names): |
||
982 | platform_el = ET.SubElement(rule, "{%s}platform" % XCCDF12_NS) |
||
983 | platform_el.set("idref", "#"+cpe_platform_name) |
||
984 | |||
985 | add_nondata_subelements( |
||
986 | rule, "requires", "idref", |
||
987 | list(map(lambda x: OSCAP_RULE + x, self.requires))) |
||
988 | add_nondata_subelements( |
||
989 | rule, "conflicts", "idref", |
||
990 | list(map(lambda x: OSCAP_RULE + x, self.conflicts))) |
||
991 | |||
992 | for ident_type, ident_val in self.identifiers.items(): |
||
993 | ident = ET.SubElement(rule, '{%s}ident' % XCCDF12_NS) |
||
994 | if ident_type == 'cce': |
||
995 | ident.set('system', cce_uri) |
||
996 | ident.text = ident_val |
||
997 | self._add_fixes_elements(rule) |
||
998 | |||
999 | ocil_parent = rule |
||
1000 | check_parent = rule |
||
1001 | |||
1002 | if self.sce_metadata: |
||
1003 | # TODO: This is pretty much another hack, just like the previous OVAL |
||
1004 | # one. However, we avoided the external SCE content as I'm not sure it |
||
1005 | # is generally useful (unlike say, CVE checking with external OVAL) |
||
1006 | # |
||
1007 | # Additionally, we build the content (check subelement) here rather |
||
1008 | # than in xslt due to the nature of our SCE metadata. |
||
1009 | # |
||
1010 | # Finally, before we begin, we might have an element with both SCE |
||
1011 | # and OVAL. We have no way of knowing (right here) whether that is |
||
1012 | # the case (due to a variety of issues, most notably, that linking |
||
1013 | # hasn't yet occurred). So we must rely on the content author's |
||
1014 | # good will, by annotating SCE content with a complex-check tag |
||
1015 | # if necessary. |
||
1016 | |||
1017 | if 'complex-check' in self.sce_metadata: |
||
1018 | # Here we have an issue: XCCDF allows EITHER one or more check |
||
1019 | # elements OR a single complex-check. While we have an explicit |
||
1020 | # case handling the OVAL-and-SCE interaction, OCIL entries have |
||
1021 | # (historically) been alongside OVAL content and been in an |
||
1022 | # "OR" manner -- preferring OVAL to SCE. In order to accomplish |
||
1023 | # this, we thus need to add _yet another parent_ when OCIL data |
||
1024 | # is present, and add update ocil_parent accordingly. |
||
1025 | if self.ocil or self.ocil_clause: |
||
1026 | ocil_parent = ET.SubElement( |
||
1027 | ocil_parent, "{%s}complex-check" % XCCDF12_NS) |
||
1028 | ocil_parent.set('operator', 'OR') |
||
1029 | |||
1030 | check_parent = ET.SubElement( |
||
1031 | ocil_parent, "{%s}complex-check" % XCCDF12_NS) |
||
1032 | check_parent.set('operator', self.sce_metadata['complex-check']) |
||
1033 | |||
1034 | # Now, add the SCE check element to the tree. |
||
1035 | check = ET.SubElement(check_parent, "{%s}check" % XCCDF12_NS) |
||
1036 | check.set("system", SCE_SYSTEM) |
||
1037 | |||
1038 | if 'check-import' in self.sce_metadata: |
||
1039 | if isinstance(self.sce_metadata['check-import'], str): |
||
1040 | self.sce_metadata['check-import'] = [self.sce_metadata['check-import']] |
||
1041 | for entry in self.sce_metadata['check-import']: |
||
1042 | check_import = ET.SubElement( |
||
1043 | check, '{%s}check-import' % XCCDF12_NS) |
||
1044 | check_import.set('import-name', entry) |
||
1045 | check_import.text = None |
||
1046 | |||
1047 | if 'check-export' in self.sce_metadata: |
||
1048 | if isinstance(self.sce_metadata['check-export'], str): |
||
1049 | self.sce_metadata['check-export'] = [self.sce_metadata['check-export']] |
||
1050 | for entry in self.sce_metadata['check-export']: |
||
1051 | export, value = entry.split('=') |
||
1052 | check_export = ET.SubElement( |
||
1053 | check, '{%s}check-export' % XCCDF12_NS) |
||
1054 | check_export.set('value-id', value) |
||
1055 | check_export.set('export-name', export) |
||
1056 | check_export.text = None |
||
1057 | |||
1058 | check_ref = ET.SubElement( |
||
1059 | check, "{%s}check-content-ref" % XCCDF12_NS) |
||
1060 | href = self.sce_metadata['relative_path'] |
||
1061 | check_ref.set("href", href) |
||
1062 | |||
1063 | check = ET.SubElement(check_parent, '{%s}check' % XCCDF12_NS) |
||
1064 | check.set("system", oval_namespace) |
||
1065 | check_content_ref = ET.SubElement( |
||
1066 | check, "{%s}check-content-ref" % XCCDF12_NS) |
||
1067 | if self.oval_external_content: |
||
1068 | check_content_ref.set("href", self.oval_external_content) |
||
1069 | else: |
||
1070 | # TODO: This is pretty much a hack, oval ID will be the same as rule ID |
||
1071 | # and we don't want the developers to have to keep them in sync. |
||
1072 | # Therefore let's just add an OVAL ref of that ID. |
||
1073 | # TODO Can we not add the check element if the rule doesn't have an OVAL check? |
||
1074 | # At the moment, the check elements of rules without OVAL are removed by |
||
1075 | # the OVALFileLinker class. |
||
1076 | check_content_ref.set("href", "oval-unlinked.xml") |
||
1077 | check_content_ref.set("name", self.id_) |
||
1078 | |||
1079 | patches_up_to_date = (self.id_ == "security_patches_up_to_date") |
||
1080 | if (self.ocil or self.ocil_clause) and not patches_up_to_date: |
||
1081 | ocil_check = ET.SubElement(check_parent, "{%s}check" % XCCDF12_NS) |
||
1082 | ocil_check.set("system", ocil_cs) |
||
1083 | ocil_check_ref = ET.SubElement( |
||
1084 | ocil_check, "{%s}check-content-ref" % XCCDF12_NS) |
||
1085 | ocil_check_ref.set("href", "ocil-unlinked.xml") |
||
1086 | ocil_check_ref.set("name", self.id_ + "_ocil") |
||
1087 | |||
1088 | return rule |
||
1089 | |||
1090 | def to_ocil(self): |
||
1091 | if not self.ocil and not self.ocil_clause: |
||
1092 | raise ValueError("Rule {0} doesn't have OCIL".format(self.id_)) |
||
1093 | # Create <questionnaire> for the rule |
||
1094 | questionnaire = ET.Element("{%s}questionnaire" % ocil_namespace, id=self.id_ + "_ocil") |
||
1095 | title = ET.SubElement(questionnaire, "{%s}title" % ocil_namespace) |
||
1096 | title.text = self.title |
||
1097 | actions = ET.SubElement(questionnaire, "{%s}actions" % ocil_namespace) |
||
1098 | test_action_ref = ET.SubElement(actions, "{%s}test_action_ref" % ocil_namespace) |
||
1099 | test_action_ref.text = self.id_ + "_action" |
||
1100 | # Create <boolean_question_test_action> for the rule |
||
1101 | action = ET.Element( |
||
1102 | "{%s}boolean_question_test_action" % ocil_namespace, |
||
1103 | id=self.id_ + "_action", |
||
1104 | question_ref=self.id_ + "_question") |
||
1105 | when_true = ET.SubElement(action, "{%s}when_true" % ocil_namespace) |
||
1106 | result = ET.SubElement(when_true, "{%s}result" % ocil_namespace) |
||
1107 | result.text = "PASS" |
||
1108 | when_true = ET.SubElement(action, "{%s}when_false" % ocil_namespace) |
||
1109 | result = ET.SubElement(when_true, "{%s}result" % ocil_namespace) |
||
1110 | result.text = "FAIL" |
||
1111 | # Create <boolean_question> |
||
1112 | boolean_question = ET.Element( |
||
1113 | "{%s}boolean_question" % ocil_namespace, id=self.id_ + "_question") |
||
1114 | # TODO: The contents of <question_text> element used to be broken in |
||
1115 | # the legacy XSLT implementation. The following code contains hacks |
||
1116 | # to get the same results as in the legacy XSLT implementation. |
||
1117 | # This enabled us a smooth transition to new OCIL generator |
||
1118 | # without a need to mass-edit rule YAML files. |
||
1119 | # We need to solve: |
||
1120 | # TODO: using variables (aka XCCDF Values) in OCIL content |
||
1121 | # TODO: using HTML formating tags eg. <pre> in OCIL content |
||
1122 | # |
||
1123 | # The "ocil" key in compiled rules contains HTML and XML elements |
||
1124 | # but OCIL question texts shouldn't contain HTML or XML elements, |
||
1125 | # therefore removing them. |
||
1126 | if self.ocil is not None: |
||
1127 | ocil_without_tags = re.sub(r"</?[^>]+>", "", self.ocil) |
||
1128 | else: |
||
1129 | ocil_without_tags = "" |
||
1130 | # The "ocil" key in compiled rules contains XML entities which would |
||
1131 | # be escaped by ET.Subelement() so we need to use add_sub_element() |
||
1132 | # instead because we don't want to escape them. |
||
1133 | question_text = add_sub_element( |
||
1134 | boolean_question, "question_text", ocil_namespace, |
||
1135 | ocil_without_tags) |
||
1136 | # The "ocil_clause" key in compiled rules also contains HTML and XML |
||
1137 | # elements but unlike the "ocil" we want to escape the '<' and '>' |
||
1138 | # characters. |
||
1139 | # The empty ocil_clause causing broken question is in line with the |
||
1140 | # legacy XSLT implementation. |
||
1141 | ocil_clause = self.ocil_clause if self.ocil_clause else "" |
||
1142 | question_text.text = ( |
||
1143 | u"{0}\n Is it the case that {1}?\n ".format( |
||
1144 | question_text.text if question_text.text is not None else "", |
||
1145 | ocil_clause)) |
||
1146 | return (questionnaire, action, boolean_question) |
||
1147 | |||
1148 | def __hash__(self): |
||
1149 | """ Controls are meant to be unique, so using the |
||
1150 | ID should suffice""" |
||
1151 | return hash(self.id_) |
||
1152 | |||
1153 | def __eq__(self, other): |
||
1154 | return isinstance(other, self.__class__) and self.id_ == other.id_ |
||
1155 | |||
1156 | def __ne__(self, other): |
||
1157 | return not self != other |
||
1158 | |||
1159 | def __lt__(self, other): |
||
1160 | return self.id_ < other.id_ |
||
1161 | |||
1162 | def __str__(self): |
||
1163 | return self.id_ |
||
1164 | |||
1165 | |||
1166 | class DirectoryLoader(object): |
||
1167 | def __init__(self, profiles_dir, env_yaml, product_cpes): |
||
1168 | self.benchmark_file = None |
||
1169 | self.group_file = None |
||
1170 | self.loaded_group = None |
||
1171 | self.rule_files = [] |
||
1172 | self.value_files = [] |
||
1173 | self.subdirectories = [] |
||
1174 | |||
1175 | self.all_values = dict() |
||
1176 | self.all_rules = dict() |
||
1177 | self.all_groups = dict() |
||
1178 | |||
1179 | self.profiles_dir = profiles_dir |
||
1180 | self.env_yaml = env_yaml |
||
1181 | self.product = env_yaml["product"] |
||
1182 | |||
1183 | self.parent_group = None |
||
1184 | self.product_cpes = product_cpes |
||
1185 | |||
1186 | def _collect_items_to_load(self, guide_directory): |
||
1187 | for dir_item in sorted(os.listdir(guide_directory)): |
||
1188 | dir_item_path = os.path.join(guide_directory, dir_item) |
||
1189 | _, extension = os.path.splitext(dir_item) |
||
1190 | |||
1191 | if extension == '.var': |
||
1192 | self.value_files.append(dir_item_path) |
||
1193 | elif dir_item == "benchmark.yml": |
||
1194 | if self.benchmark_file: |
||
1195 | raise ValueError("Multiple benchmarks in one directory") |
||
1196 | self.benchmark_file = dir_item_path |
||
1197 | elif dir_item == "group.yml": |
||
1198 | if self.group_file: |
||
1199 | raise ValueError("Multiple groups in one directory") |
||
1200 | self.group_file = dir_item_path |
||
1201 | elif extension == '.rule': |
||
1202 | self.rule_files.append(dir_item_path) |
||
1203 | elif is_rule_dir(dir_item_path): |
||
1204 | self.rule_files.append(get_rule_dir_yaml(dir_item_path)) |
||
1205 | elif dir_item != "tests": |
||
1206 | if os.path.isdir(dir_item_path): |
||
1207 | self.subdirectories.append(dir_item_path) |
||
1208 | else: |
||
1209 | sys.stderr.write( |
||
1210 | "Encountered file '%s' while recursing, extension '%s' " |
||
1211 | "is unknown. Skipping..\n" |
||
1212 | % (dir_item, extension) |
||
1213 | ) |
||
1214 | |||
1215 | def load_benchmark_or_group(self, guide_directory): |
||
1216 | """ |
||
1217 | Loads a given benchmark or group from the specified benchmark_file or |
||
1218 | group_file, in the context of guide_directory, profiles_dir and env_yaml. |
||
1219 | |||
1220 | Returns the loaded group or benchmark. |
||
1221 | """ |
||
1222 | group = None |
||
1223 | if self.group_file and self.benchmark_file: |
||
1224 | raise ValueError("A .benchmark file and a .group file were found in " |
||
1225 | "the same directory '%s'" % (guide_directory)) |
||
1226 | |||
1227 | # we treat benchmark as a special form of group in the following code |
||
1228 | if self.benchmark_file: |
||
1229 | group = Benchmark.from_yaml( |
||
1230 | self.benchmark_file, self.env_yaml, self.product_cpes |
||
1231 | ) |
||
1232 | if self.profiles_dir: |
||
1233 | group.add_profiles_from_dir(self.profiles_dir, self.env_yaml) |
||
1234 | |||
1235 | if self.group_file: |
||
1236 | group = Group.from_yaml(self.group_file, self.env_yaml, self.product_cpes) |
||
1237 | prodtypes = parse_prodtype(group.prodtype) |
||
1238 | if "all" in prodtypes or self.product in prodtypes: |
||
1239 | self.all_groups[group.id_] = group |
||
1240 | else: |
||
1241 | return None |
||
1242 | |||
1243 | return group |
||
1244 | |||
1245 | def _load_group_process_and_recurse(self, guide_directory): |
||
1246 | self.loaded_group = self.load_benchmark_or_group(guide_directory) |
||
1247 | |||
1248 | if self.loaded_group: |
||
1249 | |||
1250 | if self.parent_group: |
||
1251 | self.parent_group.add_group( |
||
1252 | self.loaded_group, env_yaml=self.env_yaml, product_cpes=self.product_cpes) |
||
1253 | |||
1254 | self._process_values() |
||
1255 | self._recurse_into_subdirs() |
||
1256 | self._process_rules() |
||
1257 | |||
1258 | def process_directory_tree(self, start_dir, extra_group_dirs=None): |
||
1259 | self._collect_items_to_load(start_dir) |
||
1260 | if extra_group_dirs: |
||
1261 | self.subdirectories += extra_group_dirs |
||
1262 | self._load_group_process_and_recurse(start_dir) |
||
1263 | |||
1264 | def process_directory_trees(self, directories): |
||
1265 | start_dir = directories[0] |
||
1266 | extra_group_dirs = directories[1:] |
||
1267 | return self.process_directory_tree(start_dir, extra_group_dirs) |
||
1268 | |||
1269 | def _recurse_into_subdirs(self): |
||
1270 | for subdir in self.subdirectories: |
||
1271 | loader = self._get_new_loader() |
||
1272 | loader.parent_group = self.loaded_group |
||
1273 | loader.process_directory_tree(subdir) |
||
1274 | self.all_values.update(loader.all_values) |
||
1275 | self.all_rules.update(loader.all_rules) |
||
1276 | self.all_groups.update(loader.all_groups) |
||
1277 | |||
1278 | def _get_new_loader(self): |
||
1279 | raise NotImplementedError() |
||
1280 | |||
1281 | def _process_values(self): |
||
1282 | raise NotImplementedError() |
||
1283 | |||
1284 | def _process_rules(self): |
||
1285 | raise NotImplementedError() |
||
1286 | |||
1287 | def save_all_entities(self, base_dir): |
||
1288 | destdir = os.path.join(base_dir, "rules") |
||
1289 | mkdir_p(destdir) |
||
1290 | if self.all_rules: |
||
1291 | self.save_entities(self.all_rules.values(), destdir) |
||
1292 | |||
1293 | destdir = os.path.join(base_dir, "groups") |
||
1294 | mkdir_p(destdir) |
||
1295 | if self.all_groups: |
||
1296 | self.save_entities(self.all_groups.values(), destdir) |
||
1297 | |||
1298 | destdir = os.path.join(base_dir, "values") |
||
1299 | mkdir_p(destdir) |
||
1300 | if self.all_values: |
||
1301 | self.save_entities(self.all_values.values(), destdir) |
||
1302 | |||
1303 | destdir = os.path.join(base_dir, "platforms") |
||
1304 | mkdir_p(destdir) |
||
1305 | if self.product_cpes.platforms: |
||
1306 | self.save_entities(self.product_cpes.platforms.values(), destdir) |
||
1307 | |||
1308 | destdir = os.path.join(base_dir, "cpe_items") |
||
1309 | mkdir_p(destdir) |
||
1310 | if self.product_cpes.cpes_by_id: |
||
1311 | self.save_entities(self.product_cpes.cpes_by_id.values(), destdir) |
||
1312 | |||
1313 | def save_entities(self, entities, destdir): |
||
1314 | if not entities: |
||
1315 | return |
||
1316 | for entity in entities: |
||
1317 | basename = entity.id_ + ".yml" |
||
1318 | dest_filename = os.path.join(destdir, basename) |
||
1319 | entity.dump_yaml(dest_filename) |
||
1320 | |||
1321 | |||
1322 | class BuildLoader(DirectoryLoader): |
||
1323 | def __init__( |
||
1324 | self, profiles_dir, env_yaml, product_cpes, |
||
1325 | sce_metadata_path=None, stig_reference_path=None): |
||
1326 | super(BuildLoader, self).__init__(profiles_dir, env_yaml, product_cpes) |
||
1327 | |||
1328 | self.sce_metadata = None |
||
1329 | if sce_metadata_path and os.path.getsize(sce_metadata_path): |
||
1330 | self.sce_metadata = json.load(open(sce_metadata_path, 'r')) |
||
1331 | self.stig_references = None |
||
1332 | if stig_reference_path: |
||
1333 | self.stig_references = ssg.build_stig.map_versions_to_rule_ids(stig_reference_path) |
||
1334 | self.components_dir = None |
||
1335 | self.rule_to_components = self._load_components() |
||
1336 | |||
1337 | def _load_components(self): |
||
1338 | if "components_root" not in self.env_yaml: |
||
1339 | return None |
||
1340 | product_dir = self.env_yaml["product_dir"] |
||
1341 | components_root = self.env_yaml["components_root"] |
||
1342 | self.components_dir = os.path.abspath( |
||
1343 | os.path.join(product_dir, components_root)) |
||
1344 | components = ssg.components.load(self.components_dir) |
||
1345 | rule_to_components = ssg.components.rule_component_mapping( |
||
1346 | components) |
||
1347 | return rule_to_components |
||
1348 | |||
1349 | def _process_values(self): |
||
1350 | for value_yaml in self.value_files: |
||
1351 | value = Value.from_yaml(value_yaml, self.env_yaml) |
||
1352 | self.all_values[value.id_] = value |
||
1353 | self.loaded_group.add_value(value) |
||
1354 | |||
1355 | def _process_rule(self, rule): |
||
1356 | if self.rule_to_components is not None and rule.id_ not in self.rule_to_components: |
||
1357 | raise ValueError( |
||
1358 | "The rule '%s' isn't mapped to any component! Insert the " |
||
1359 | "rule ID to at least one file in '%s'." % |
||
1360 | (rule.id_, self.components_dir)) |
||
1361 | prodtypes = parse_prodtype(rule.prodtype) |
||
1362 | if "all" not in prodtypes and self.product not in prodtypes: |
||
1363 | return False |
||
1364 | self.all_rules[rule.id_] = rule |
||
1365 | self.loaded_group.add_rule( |
||
1366 | rule, env_yaml=self.env_yaml, product_cpes=self.product_cpes) |
||
1367 | rule.normalize(self.env_yaml["product"]) |
||
1368 | if self.stig_references: |
||
1369 | rule.add_stig_references(self.stig_references) |
||
1370 | if self.rule_to_components is not None: |
||
1371 | rule.components = self.rule_to_components[rule.id_] |
||
1372 | return True |
||
1373 | |||
1374 | def _process_rules(self): |
||
1375 | for rule_yaml in self.rule_files: |
||
1376 | try: |
||
1377 | rule = Rule.from_yaml( |
||
1378 | rule_yaml, self.env_yaml, self.product_cpes, self.sce_metadata) |
||
1379 | except DocumentationNotComplete: |
||
1380 | # Happens on non-debug build when a rule is "documentation-incomplete" |
||
1381 | continue |
||
1382 | if not self._process_rule(rule): |
||
1383 | continue |
||
1384 | |||
1385 | def _get_new_loader(self): |
||
1386 | loader = BuildLoader( |
||
1387 | self.profiles_dir, self.env_yaml, self.product_cpes) |
||
1388 | # Do it this way so we only have to parse the SCE metadata once. |
||
1389 | loader.sce_metadata = self.sce_metadata |
||
1390 | # Do it this way so we only have to parse the STIG references once. |
||
1391 | loader.stig_references = self.stig_references |
||
1392 | # Do it this way so we only have to parse the component metadata once. |
||
1393 | loader.rule_to_components = self.rule_to_components |
||
1394 | return loader |
||
1395 | |||
1396 | def export_group_to_file(self, filename): |
||
1397 | return self.loaded_group.to_file(filename) |
||
1398 | |||
1399 | |||
1400 | class LinearLoader(object): |
||
1401 | def __init__(self, env_yaml, resolved_path): |
||
1402 | self.resolved_rules_dir = os.path.join(resolved_path, "rules") |
||
1403 | self.rules = dict() |
||
1404 | |||
1405 | self.resolved_profiles_dir = os.path.join(resolved_path, "profiles") |
||
1406 | self.profiles = dict() |
||
1407 | |||
1408 | self.resolved_groups_dir = os.path.join(resolved_path, "groups") |
||
1409 | self.groups = dict() |
||
1410 | |||
1411 | self.resolved_values_dir = os.path.join(resolved_path, "values") |
||
1412 | self.values = dict() |
||
1413 | |||
1414 | self.resolved_platforms_dir = os.path.join(resolved_path, "platforms") |
||
1415 | self.platforms = dict() |
||
1416 | |||
1417 | self.fixes_dir = os.path.join(resolved_path, "fixes") |
||
1418 | self.fixes = dict() |
||
1419 | |||
1420 | self.resolved_cpe_items_dir = os.path.join(resolved_path, "cpe_items") |
||
1421 | self.cpe_items = dict() |
||
1422 | |||
1423 | self.benchmark = None |
||
1424 | self.env_yaml = env_yaml |
||
1425 | self.product_cpes = ProductCPEs() |
||
1426 | |||
1427 | def find_first_groups_ids(self, start_dir): |
||
1428 | group_files = glob.glob(os.path.join(start_dir, "*", "group.yml")) |
||
1429 | group_ids = [fname.split(os.path.sep)[-2] for fname in group_files] |
||
1430 | return group_ids |
||
1431 | |||
1432 | def load_entities_by_id(self, filenames, destination, cls): |
||
1433 | for fname in filenames: |
||
1434 | entity = cls.from_yaml(fname, self.env_yaml, self.product_cpes) |
||
1435 | destination[entity.id_] = entity |
||
1436 | |||
1437 | def add_fixes_to_rules(self): |
||
1438 | for rule_id, rule_fixes in self.fixes.items(): |
||
1439 | self.rules[rule_id].add_fixes(rule_fixes) |
||
1440 | |||
1441 | def load_benchmark(self, directory): |
||
1442 | self.benchmark = Benchmark.from_yaml( |
||
1443 | os.path.join(directory, "benchmark.yml"), self.env_yaml, self.product_cpes) |
||
1444 | |||
1445 | self.benchmark.add_profiles_from_dir( |
||
1446 | self.resolved_profiles_dir, self.env_yaml, self.product_cpes) |
||
1447 | |||
1448 | benchmark_first_groups = self.find_first_groups_ids(directory) |
||
1449 | for gid in benchmark_first_groups: |
||
1450 | try: |
||
1451 | self.benchmark.add_group(self.groups[gid], self.env_yaml, self.product_cpes) |
||
1452 | except KeyError as exc: |
||
1453 | # Add only the groups we have compiled and loaded |
||
1454 | pass |
||
1455 | self.benchmark.unselect_empty_groups() |
||
1456 | |||
1457 | def load_compiled_content(self): |
||
1458 | self.product_cpes.load_cpes_from_directory_tree(self.resolved_cpe_items_dir, self.env_yaml) |
||
1459 | |||
1460 | self.fixes = ssg.build_remediations.load_compiled_remediations(self.fixes_dir) |
||
1461 | |||
1462 | filenames = glob.glob(os.path.join(self.resolved_rules_dir, "*.yml")) |
||
1463 | self.load_entities_by_id(filenames, self.rules, Rule) |
||
1464 | |||
1465 | filenames = glob.glob(os.path.join(self.resolved_groups_dir, "*.yml")) |
||
1466 | self.load_entities_by_id(filenames, self.groups, Group) |
||
1467 | |||
1468 | filenames = glob.glob(os.path.join(self.resolved_values_dir, "*.yml")) |
||
1469 | self.load_entities_by_id(filenames, self.values, Value) |
||
1470 | |||
1471 | filenames = glob.glob(os.path.join(self.resolved_platforms_dir, "*.yml")) |
||
1472 | self.load_entities_by_id(filenames, self.platforms, Platform) |
||
1473 | self.product_cpes.platforms = self.platforms |
||
1474 | |||
1475 | for g in self.groups.values(): |
||
1476 | g.load_entities(self.rules, self.values, self.groups) |
||
1477 | |||
1478 | def export_benchmark_to_xml(self): |
||
1479 | return self.benchmark.to_xml_element(self.env_yaml) |
||
1480 | |||
1481 | def export_benchmark_to_file(self, filename): |
||
1482 | register_namespaces() |
||
1483 | return self.benchmark.to_file(filename, self.env_yaml) |
||
1484 | |||
1485 | def export_ocil_to_xml(self): |
||
1486 | root = ET.Element('{%s}ocil' % ocil_namespace) |
||
1487 | root.set('xmlns:xsi', xsi_namespace) |
||
1488 | root.set("xmlns:xhtml", xhtml_namespace) |
||
1489 | generator = ET.SubElement(root, "{%s}generator" % ocil_namespace) |
||
1490 | product_name = ET.SubElement(generator, "{%s}product_name" % ocil_namespace) |
||
1491 | product_name.text = "build_shorthand.py from SCAP Security Guide" |
||
1492 | product_version = ET.SubElement(generator, "{%s}product_version" % ocil_namespace) |
||
1493 | product_version.text = "ssg: " + self.env_yaml["ssg_version_str"] |
||
1494 | schema_version = ET.SubElement(generator, "{%s}schema_version" % ocil_namespace) |
||
1495 | schema_version.text = "2.0" |
||
1496 | timestamp_el = ET.SubElement(generator, "{%s}timestamp" % ocil_namespace) |
||
1497 | timestamp_el.text = timestamp |
||
1498 | questionnaires = ET.SubElement(root, "{%s}questionnaires" % ocil_namespace) |
||
1499 | test_actions = ET.SubElement(root, "{%s}test_actions" % ocil_namespace) |
||
1500 | questions = ET.SubElement(root, "{%s}questions" % ocil_namespace) |
||
1501 | for rule in self.rules.values(): |
||
1502 | if not rule.ocil and not rule.ocil_clause: |
||
1503 | continue |
||
1504 | questionnaire, action, boolean_question = rule.to_ocil() |
||
1505 | questionnaires.append(questionnaire) |
||
1506 | test_actions.append(action) |
||
1507 | questions.append(boolean_question) |
||
1508 | return root |
||
1509 | |||
1510 | def export_ocil_to_file(self, filename): |
||
1511 | root = self.export_ocil_to_xml() |
||
1512 | tree = ET.ElementTree(root) |
||
1513 | tree.write(filename) |
||
1514 | |||
1515 | |||
1516 | class Platform(XCCDFEntity): |
||
1517 | |||
1518 | KEYS = dict( |
||
1519 | name=lambda: "", |
||
1520 | original_expression=lambda: "", |
||
1521 | xml_content=lambda: "", |
||
1522 | bash_conditional=lambda: "", |
||
1523 | ansible_conditional=lambda: "", |
||
1524 | ** XCCDFEntity.KEYS |
||
1525 | ) |
||
1526 | |||
1527 | MANDATORY_KEYS = [ |
||
1528 | "name", |
||
1529 | "xml_content", |
||
1530 | "original_expression", |
||
1531 | "bash_conditional", |
||
1532 | "ansible_conditional" |
||
1533 | ] |
||
1534 | |||
1535 | prefix = "cpe-lang" |
||
1536 | ns = PREFIX_TO_NS[prefix] |
||
1537 | |||
1538 | @classmethod |
||
1539 | def from_text(cls, expression, product_cpes): |
||
1540 | if not product_cpes: |
||
1541 | return None |
||
1542 | test = product_cpes.algebra.parse(expression, simplify=True) |
||
1543 | id_ = test.as_id() |
||
1544 | platform = cls(id_) |
||
1545 | platform.test = test |
||
1546 | product_cpes.add_resolved_cpe_items_from_platform(platform) |
||
1547 | platform.test.enrich_with_cpe_info(product_cpes) |
||
1548 | platform.name = id_ |
||
1549 | platform.original_expression = expression |
||
1550 | platform.xml_content = platform.get_xml() |
||
1551 | platform.update_conditional_from_cpe_items("bash", product_cpes) |
||
1552 | platform.update_conditional_from_cpe_items("ansible", product_cpes) |
||
1553 | return platform |
||
1554 | |||
1555 | def get_xml(self): |
||
1556 | cpe_platform = ET.Element("{%s}platform" % Platform.ns) |
||
1557 | cpe_platform.set('id', self.name) |
||
1558 | # In case the platform contains only single CPE name, fake the logical test |
||
1559 | # we have to adhere to CPE specification |
||
1560 | if isinstance(self.test, CPEALCheckFactRef): |
||
1561 | cpe_test = ET.Element("{%s}logical-test" % CPEALLogicalTest.ns) |
||
1562 | cpe_test.set('operator', 'AND') |
||
1563 | cpe_test.set('negate', 'false') |
||
1564 | cpe_test.append(self.test.to_xml_element()) |
||
1565 | cpe_platform.append(cpe_test) |
||
1566 | else: |
||
1567 | cpe_platform.append(self.test.to_xml_element()) |
||
1568 | xmlstr = ET.tostring(cpe_platform).decode() |
||
1569 | return xmlstr |
||
1570 | |||
1571 | def to_xml_element(self): |
||
1572 | return ET.fromstring(self.xml_content) |
||
1573 | |||
1574 | def get_remediation_conditional(self, language): |
||
1575 | if language == "bash": |
||
1576 | return self.bash_conditional |
||
1577 | elif language == "ansible": |
||
1578 | return self.ansible_conditional |
||
1579 | else: |
||
1580 | raise AttributeError("Invalid remediation language {0} specified.".format(language)) |
||
1581 | |||
1582 | @classmethod |
||
1583 | def from_yaml(cls, yaml_file, env_yaml=None, product_cpes=None): |
||
1584 | platform = super(Platform, cls).from_yaml(yaml_file, env_yaml) |
||
1585 | # If we received a product_cpes, we can restore also the original test object |
||
1586 | # it can be later used e.g. for comparison |
||
1587 | if product_cpes: |
||
1588 | platform.test = product_cpes.algebra.parse(platform.original_expression, simplify=True) |
||
1589 | product_cpes.add_resolved_cpe_items_from_platform(platform) |
||
1590 | return platform |
||
1591 | |||
1592 | def get_fact_refs(self): |
||
1593 | return self.test.get_symbols() |
||
1594 | |||
1595 | def update_conditional_from_cpe_items(self, language, product_cpes): |
||
1596 | self.test.enrich_with_cpe_info(product_cpes) |
||
1597 | if language == "bash": |
||
1598 | self.bash_conditional = self.test.to_bash_conditional() |
||
1599 | elif language == "ansible": |
||
1600 | self.ansible_conditional = self.test.to_ansible_conditional() |
||
1601 | else: |
||
1602 | raise RuntimeError( |
||
1603 | "Platform remediations do not support the {0} language".format(language)) |
||
1604 | |||
1605 | def __eq__(self, other): |
||
1606 | if not isinstance(other, Platform): |
||
1607 | return False |
||
1608 | else: |
||
1609 | return self.test == other.test |
||
1610 | |||
1611 | |||
1612 | def add_platform_if_not_defined(platform, product_cpes): |
||
1613 | # check if the platform is already in the dictionary. If yes, return the existing one |
||
1614 | for p in product_cpes.platforms.values(): |
||
1615 | if platform == p: |
||
1616 | return p |
||
1617 | product_cpes.platforms[platform.id_] = platform |
||
1618 | return platform |
||
1619 |