Total Complexity | 379 |
Total Lines | 1934 |
Duplicated Lines | 1.24 % |
Coverage | 31.85% |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like ssg.build_yaml often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | 2 | from __future__ import absolute_import |
|
2 | 2 | from __future__ import print_function |
|
3 | |||
4 | 2 | from collections import defaultdict |
|
5 | 2 | from copy import deepcopy |
|
6 | 2 | import datetime |
|
7 | 2 | import json |
|
8 | 2 | import os |
|
9 | 2 | import os.path |
|
10 | 2 | import re |
|
11 | 2 | import sys |
|
12 | 2 | from xml.sax.saxutils import escape |
|
13 | 2 | import glob |
|
14 | |||
15 | 2 | import yaml |
|
16 | |||
17 | 2 | from .build_cpe import CPEDoesNotExist, parse_platform_definition |
|
18 | 2 | from .constants import XCCDF_REFINABLE_PROPERTIES, SCE_SYSTEM, ocil_cs, ocil_namespace, xhtml_namespace, xsi_namespace, timestamp |
|
19 | 2 | from .rules import get_rule_dir_id, get_rule_dir_yaml, is_rule_dir |
|
20 | 2 | from .rule_yaml import parse_prodtype |
|
21 | |||
22 | 2 | from .cce import is_cce_format_valid, is_cce_value_valid |
|
23 | 2 | from .yaml import DocumentationNotComplete, open_and_expand, open_and_macro_expand |
|
24 | 2 | from .utils import required_key, mkdir_p |
|
25 | |||
26 | 2 | from .xml import ElementTree as ET |
|
27 | 2 | from .shims import unicode_func |
|
28 | |||
29 | |||
30 | 2 | def dump_yaml_preferably_in_original_order(dictionary, file_object): |
|
31 | try: |
||
32 | return yaml.dump(dictionary, file_object, indent=4, sort_keys=False) |
||
33 | except TypeError as exc: |
||
34 | # Older versions of libyaml don't understand the sort_keys kwarg |
||
35 | if "sort_keys" not in str(exc): |
||
36 | raise exc |
||
37 | return yaml.dump(dictionary, file_object, indent=4) |
||
38 | |||
39 | |||
40 | 2 | def add_sub_element(parent, tag, data): |
|
41 | """ |
||
42 | Creates a new child element under parent with tag tag, and sets |
||
43 | data as the content under the tag. In particular, data is a string |
||
44 | to be parsed as an XML tree, allowing sub-elements of children to be |
||
45 | added. |
||
46 | |||
47 | If data should not be parsed as an XML tree, either escape the contents |
||
48 | before passing into this function, or use ElementTree.SubElement(). |
||
49 | |||
50 | Returns the newly created subelement of type tag. |
||
51 | """ |
||
52 | # This is used because our YAML data contain XML and XHTML elements |
||
53 | # ET.SubElement() escapes the < > characters by < and > |
||
54 | # and therefore it does not add child elements |
||
55 | # we need to do a hack instead |
||
56 | # TODO: Remove this function after we move to Markdown everywhere in SSG |
||
57 | ustr = unicode_func("<{0}>{1}</{0}>").format(tag, data) |
||
58 | |||
59 | try: |
||
60 | element = ET.fromstring(ustr.encode("utf-8")) |
||
61 | except Exception: |
||
62 | msg = ("Error adding subelement to an element '{0}' from string: '{1}'" |
||
63 | .format(parent.tag, ustr)) |
||
64 | raise RuntimeError(msg) |
||
65 | |||
66 | parent.append(element) |
||
67 | return element |
||
68 | |||
69 | |||
70 | 2 | def reorder_according_to_ordering(unordered, ordering, regex=None): |
|
71 | 2 | ordered = [] |
|
72 | 2 | if regex is None: |
|
73 | 2 | regex = "|".join(["({0})".format(item) for item in ordering]) |
|
74 | 2 | regex = re.compile(regex) |
|
75 | |||
76 | 2 | items_to_order = list(filter(regex.match, unordered)) |
|
77 | 2 | unordered = set(unordered) |
|
78 | |||
79 | 2 | for priority_type in ordering: |
|
80 | 2 | for item in items_to_order: |
|
81 | 2 | if priority_type in item and item in unordered: |
|
82 | 2 | ordered.append(item) |
|
83 | 2 | unordered.remove(item) |
|
84 | 2 | ordered.extend(sorted(unordered)) |
|
85 | 2 | return ordered |
|
86 | |||
87 | |||
88 | 2 | def add_warning_elements(element, warnings): |
|
89 | # The use of [{dict}, {dict}] in warnings is to handle the following |
||
90 | # scenario where multiple warnings have the same category which is |
||
91 | # valid in SCAP and our content: |
||
92 | # |
||
93 | # warnings: |
||
94 | # - general: Some general warning |
||
95 | # - general: Some other general warning |
||
96 | # - general: |- |
||
97 | # Some really long multiline general warning |
||
98 | # |
||
99 | # Each of the {dict} should have only one key/value pair. |
||
100 | for warning_dict in warnings: |
||
101 | warning = add_sub_element(element, "warning", list(warning_dict.values())[0]) |
||
102 | warning.set("category", list(warning_dict.keys())[0]) |
||
103 | |||
104 | |||
105 | 2 | def add_nondata_subelements(element, subelement, attribute, attr_data): |
|
106 | """Add multiple iterations of a sublement that contains an attribute but no data |
||
107 | For example, <requires id="my_required_id"/>""" |
||
108 | for data in attr_data: |
||
109 | req = ET.SubElement(element, subelement) |
||
110 | req.set(attribute, data) |
||
111 | |||
112 | |||
113 | 2 | def check_warnings(xccdf_structure): |
|
114 | 2 | for warning_list in xccdf_structure.warnings: |
|
115 | if len(warning_list) != 1: |
||
116 | msg = "Only one key/value pair should exist for each warnings dictionary" |
||
117 | raise ValueError(msg) |
||
118 | |||
119 | |||
120 | 2 | class SelectionHandler(object): |
|
121 | 2 | def __init__(self): |
|
122 | 2 | self.refine_rules = defaultdict(list) |
|
123 | 2 | self.variables = dict() |
|
124 | 2 | self.unselected = [] |
|
125 | 2 | self.selected = [] |
|
126 | |||
127 | 2 | @property |
|
128 | def selections(self): |
||
129 | selections = [] |
||
130 | for item in self.selected: |
||
131 | selections.append(str(item)) |
||
132 | for item in self.unselected: |
||
133 | selections.append("!"+str(item)) |
||
134 | for varname in self.variables.keys(): |
||
135 | selections.append(varname+"="+self.variables.get(varname)) |
||
136 | for rule, refinements in self.refine_rules.items(): |
||
137 | for prop, val in refinements: |
||
138 | selections.append("{rule}.{property}={value}" |
||
139 | .format(rule=rule, property=prop, value=val)) |
||
140 | return selections |
||
141 | |||
142 | 2 | @selections.setter |
|
143 | def selections(self, entries): |
||
144 | 2 | for item in entries: |
|
145 | 2 | self.apply_selection(item) |
|
146 | |||
147 | 2 | def apply_selection(self, item): |
|
148 | 2 | if "." in item: |
|
149 | rule, refinement = item.split(".", 1) |
||
150 | property_, value = refinement.split("=", 1) |
||
151 | if property_ not in XCCDF_REFINABLE_PROPERTIES: |
||
152 | msg = ("Property '{property_}' cannot be refined. " |
||
153 | "Rule properties that can be refined are {refinables}. " |
||
154 | "Fix refinement '{rule_id}.{property_}={value}' in profile '{profile}'." |
||
155 | .format(property_=property_, refinables=XCCDF_REFINABLE_PROPERTIES, |
||
156 | rule_id=rule, value=value, profile=self.id_) |
||
157 | ) |
||
158 | raise ValueError(msg) |
||
159 | self.refine_rules[rule].append((property_, value)) |
||
160 | 2 | elif "=" in item: |
|
161 | 2 | varname, value = item.split("=", 1) |
|
162 | 2 | self.variables[varname] = value |
|
163 | 2 | elif item.startswith("!"): |
|
164 | self.unselected.append(item[1:]) |
||
165 | else: |
||
166 | 2 | self.selected.append(item) |
|
167 | |||
168 | 2 | def _subtract_refinements(self, extended_refinements): |
|
169 | """ |
||
170 | Given a dict of rule refinements from the extended profile, |
||
171 | "undo" every refinement prefixed with '!' in this profile. |
||
172 | """ |
||
173 | for rule, refinements in list(self.refine_rules.items()): |
||
174 | if rule.startswith("!"): |
||
175 | for prop, val in refinements: |
||
176 | extended_refinements[rule[1:]].remove((prop, val)) |
||
177 | del self.refine_rules[rule] |
||
178 | return extended_refinements |
||
179 | |||
180 | 2 | def update_with(self, rhs): |
|
181 | extended_selects = set(rhs.selected) |
||
182 | extra_selections = extended_selects.difference(set(self.selected)) |
||
183 | self.selected.extend(list(extra_selections)) |
||
184 | |||
185 | updated_variables = dict(rhs.variables) |
||
186 | updated_variables.update(self.variables) |
||
187 | self.variables = updated_variables |
||
188 | |||
189 | extended_refinements = deepcopy(rhs.refine_rules) |
||
190 | updated_refinements = self._subtract_refinements(extended_refinements) |
||
191 | updated_refinements.update(self.refine_rules) |
||
192 | self.refine_rules = updated_refinements |
||
193 | |||
194 | |||
195 | 2 | class XCCDFEntity(object): |
|
196 | """ |
||
197 | This class can load itself from a YAML with Jinja macros, |
||
198 | and it can also save itself to YAML. |
||
199 | |||
200 | It is supposed to work with the content in the project, |
||
201 | when entities are defined in the benchmark tree, |
||
202 | and they are compiled into flat YAMLs to the build directory. |
||
203 | """ |
||
204 | 2 | KEYS = dict( |
|
205 | id_=lambda: "", |
||
206 | definition_location=lambda: "", |
||
207 | ) |
||
208 | |||
209 | 2 | MANDATORY_KEYS = set() |
|
210 | |||
211 | 2 | GENERIC_FILENAME = "" |
|
212 | 2 | ID_LABEL = "id" |
|
213 | |||
214 | 2 | def __init__(self, id_): |
|
215 | 2 | super(XCCDFEntity, self).__init__() |
|
216 | 2 | self._assign_defaults() |
|
217 | 2 | self.id_ = id_ |
|
218 | |||
219 | 2 | def _assign_defaults(self): |
|
220 | 2 | for key, default in self.KEYS.items(): |
|
221 | 2 | default_val = default() |
|
222 | 2 | if isinstance(default_val, RuntimeError): |
|
223 | default_val = None |
||
224 | 2 | setattr(self, key, default_val) |
|
225 | |||
226 | 2 | @classmethod |
|
227 | def get_instance_from_full_dict(cls, data): |
||
228 | """ |
||
229 | Given a defining dictionary, produce an instance |
||
230 | by treating all dict elements as attributes. |
||
231 | |||
232 | Extend this if you want tight control over the instance creation process. |
||
233 | """ |
||
234 | 2 | entity = cls(data["id_"]) |
|
235 | 2 | for key, value in data.items(): |
|
236 | 2 | setattr(entity, key, value) |
|
237 | 2 | return entity |
|
238 | |||
239 | 2 | @classmethod |
|
240 | def process_input_dict(cls, input_contents, env_yaml): |
||
241 | """ |
||
242 | Take the contents of the definition as a dictionary, and |
||
243 | add defaults or raise errors if a required member is not present. |
||
244 | |||
245 | Extend this if you want to add, remove or alter the result |
||
246 | that will constitute the new instance. |
||
247 | """ |
||
248 | 2 | data = dict() |
|
249 | |||
250 | 2 | for key, default in cls.KEYS.items(): |
|
251 | 2 | if key in input_contents: |
|
252 | 2 | data[key] = input_contents[key] |
|
253 | 2 | del input_contents[key] |
|
254 | 2 | continue |
|
255 | |||
256 | 2 | if key not in cls.MANDATORY_KEYS: |
|
257 | 2 | data[key] = cls.KEYS[key]() |
|
258 | else: |
||
259 | msg = ( |
||
260 | "Key '{key}' is mandatory for definition of '{class_name}'." |
||
261 | .format(key=key, class_name=cls.__name__)) |
||
262 | raise ValueError(msg) |
||
263 | |||
264 | 2 | return data |
|
265 | |||
266 | 2 | @classmethod |
|
267 | 2 | def parse_yaml_into_processed_dict(cls, yaml_file, env_yaml=None): |
|
268 | """ |
||
269 | Given yaml filename and environment info, produce a dictionary |
||
270 | that defines the instance to be created. |
||
271 | This wraps :meth:`process_input_dict` and it adds generic keys on the top: |
||
272 | |||
273 | - `id_` as the entity ID that is deduced either from thefilename, |
||
274 | or from the parent directory name. |
||
275 | - `definition_location` as the original location whenre the entity got defined. |
||
276 | """ |
||
277 | 2 | file_basename = os.path.basename(yaml_file) |
|
278 | 2 | entity_id = file_basename.split(".")[0] |
|
279 | 2 | if file_basename == cls.GENERIC_FILENAME: |
|
280 | 2 | entity_id = os.path.basename(os.path.dirname(yaml_file)) |
|
281 | |||
282 | 2 | if env_yaml: |
|
283 | 2 | env_yaml[cls.ID_LABEL] = entity_id |
|
284 | 2 | yaml_data = open_and_macro_expand(yaml_file, env_yaml) |
|
285 | |||
286 | 2 | try: |
|
287 | 2 | processed_data = cls.process_input_dict(yaml_data, env_yaml) |
|
288 | except ValueError as exc: |
||
289 | msg = ( |
||
290 | "Error processing {yaml_file}: {exc}" |
||
291 | .format(yaml_file=yaml_file, exc=str(exc))) |
||
292 | raise ValueError(msg) |
||
293 | |||
294 | 2 | if yaml_data: |
|
295 | msg = ( |
||
296 | "Unparsed YAML data in '{yaml_file}': {keys}" |
||
297 | .format(yaml_file=yaml_file, keys=list(yaml_data.keys()))) |
||
298 | raise RuntimeError(msg) |
||
299 | |||
300 | 2 | if not processed_data.get("definition_location", ""): |
|
301 | 2 | processed_data["definition_location"] = yaml_file |
|
302 | |||
303 | 2 | processed_data["id_"] = entity_id |
|
304 | |||
305 | 2 | return processed_data |
|
306 | |||
307 | 2 | @classmethod |
|
308 | 2 | def from_yaml(cls, yaml_file, env_yaml=None): |
|
309 | 2 | yaml_file = os.path.normpath(yaml_file) |
|
310 | |||
311 | 2 | local_env_yaml = None |
|
312 | 2 | if env_yaml: |
|
313 | 2 | local_env_yaml = dict() |
|
314 | 2 | local_env_yaml.update(env_yaml) |
|
315 | |||
316 | 2 | try: |
|
317 | 2 | data_dict = cls.parse_yaml_into_processed_dict(yaml_file, local_env_yaml) |
|
318 | except DocumentationNotComplete as exc: |
||
319 | raise |
||
320 | except Exception as exc: |
||
321 | msg = ( |
||
322 | "Error loading a {class_name} from {filename}: {error}" |
||
323 | .format(class_name=cls.__name__, filename=yaml_file, error=str(exc))) |
||
324 | raise RuntimeError(msg) |
||
325 | |||
326 | 2 | result = cls.get_instance_from_full_dict(data_dict) |
|
327 | |||
328 | 2 | return result |
|
329 | |||
330 | 2 | def represent_as_dict(self): |
|
331 | """ |
||
332 | Produce a dict representation of the class. |
||
333 | |||
334 | Extend this method if you need the representation to be different from the object. |
||
335 | """ |
||
336 | 2 | data = dict() |
|
337 | 2 | for key in self.KEYS: |
|
338 | 2 | value = getattr(self, key) |
|
339 | 2 | if value or True: |
|
340 | 2 | data[key] = getattr(self, key) |
|
341 | 2 | del data["id_"] |
|
342 | 2 | return data |
|
343 | |||
344 | 2 | def dump_yaml(self, file_name, documentation_complete=True): |
|
345 | to_dump = self.represent_as_dict() |
||
346 | to_dump["documentation_complete"] = documentation_complete |
||
347 | with open(file_name, "w+") as f: |
||
348 | dump_yaml_preferably_in_original_order(to_dump, f) |
||
349 | |||
350 | 2 | def to_xml_element(self): |
|
351 | raise NotImplementedError() |
||
352 | |||
353 | |||
354 | 2 | class Profile(XCCDFEntity, SelectionHandler): |
|
355 | """Represents XCCDF profile |
||
356 | """ |
||
357 | 2 | KEYS = dict( |
|
358 | title=lambda: "", |
||
359 | description=lambda: "", |
||
360 | extends=lambda: "", |
||
361 | metadata=lambda: None, |
||
362 | reference=lambda: None, |
||
363 | selections=lambda: list(), |
||
364 | platforms=lambda: set(), |
||
365 | cpe_names=lambda: set(), |
||
366 | platform=lambda: None, |
||
367 | filter_rules=lambda: "", |
||
368 | ** XCCDFEntity.KEYS |
||
369 | ) |
||
370 | |||
371 | 2 | MANDATORY_KEYS = { |
|
372 | "title", |
||
373 | "description", |
||
374 | "selections", |
||
375 | } |
||
376 | |||
377 | 2 | @classmethod |
|
378 | def process_input_dict(cls, input_contents, env_yaml): |
||
379 | 2 | input_contents = super(Profile, cls).process_input_dict(input_contents, env_yaml) |
|
380 | |||
381 | 2 | platform = input_contents.get("platform") |
|
382 | 2 | if platform is not None: |
|
383 | input_contents["platforms"].add(platform) |
||
384 | |||
385 | 2 | if env_yaml: |
|
386 | 2 | for platform in input_contents["platforms"]: |
|
387 | try: |
||
388 | new_cpe_name = env_yaml["product_cpes"].get_cpe_name(platform) |
||
389 | input_contents["cpe_names"].add(new_cpe_name) |
||
390 | except CPEDoesNotExist: |
||
391 | msg = ( |
||
392 | "Unsupported platform '{platform}' in a profile." |
||
393 | .format(platform=platform)) |
||
394 | raise CPEDoesNotExist(msg) |
||
395 | |||
396 | 2 | return input_contents |
|
397 | |||
398 | 2 | @property |
|
399 | def rule_filter(self): |
||
400 | if self.filter_rules: |
||
401 | return rule_filter_from_def(self.filter_rules) |
||
402 | else: |
||
403 | return noop_rule_filterfunc |
||
404 | |||
405 | 2 | def to_xml_element(self): |
|
406 | element = ET.Element('Profile') |
||
407 | element.set("id", self.id_) |
||
408 | if self.extends: |
||
409 | element.set("extends", self.extends) |
||
410 | title = add_sub_element(element, "title", self.title) |
||
411 | title.set("override", "true") |
||
412 | desc = add_sub_element(element, "description", self.description) |
||
413 | desc.set("override", "true") |
||
414 | |||
415 | if self.reference: |
||
416 | add_sub_element(element, "reference", escape(self.reference)) |
||
417 | |||
418 | for cpe_name in self.cpe_names: |
||
419 | plat = ET.SubElement(element, "platform") |
||
420 | plat.set("idref", cpe_name) |
||
421 | |||
422 | for selection in self.selected: |
||
423 | select = ET.Element("select") |
||
424 | select.set("idref", selection) |
||
425 | select.set("selected", "true") |
||
426 | element.append(select) |
||
427 | |||
428 | for selection in self.unselected: |
||
429 | unselect = ET.Element("select") |
||
430 | unselect.set("idref", selection) |
||
431 | unselect.set("selected", "false") |
||
432 | element.append(unselect) |
||
433 | |||
434 | for value_id, selector in self.variables.items(): |
||
435 | refine_value = ET.Element("refine-value") |
||
436 | refine_value.set("idref", value_id) |
||
437 | refine_value.set("selector", selector) |
||
438 | element.append(refine_value) |
||
439 | |||
440 | for refined_rule, refinement_list in self.refine_rules.items(): |
||
441 | refine_rule = ET.Element("refine-rule") |
||
442 | refine_rule.set("idref", refined_rule) |
||
443 | for refinement in refinement_list: |
||
444 | refine_rule.set(refinement[0], refinement[1]) |
||
445 | element.append(refine_rule) |
||
446 | |||
447 | return element |
||
448 | |||
449 | 2 | def get_rule_selectors(self): |
|
450 | 2 | return self.selected + self.unselected |
|
451 | |||
452 | 2 | def get_variable_selectors(self): |
|
453 | 2 | return self.variables |
|
454 | |||
455 | 2 | def validate_refine_rules(self, rules): |
|
456 | existing_rule_ids = [r.id_ for r in rules] |
||
457 | for refine_rule, refinement_list in self.refine_rules.items(): |
||
458 | # Take first refinement to ilustrate where the error is |
||
459 | # all refinements in list are invalid, so it doesn't really matter |
||
460 | a_refinement = refinement_list[0] |
||
461 | |||
462 | if refine_rule not in existing_rule_ids: |
||
463 | msg = ( |
||
464 | "You are trying to refine a rule that doesn't exist. " |
||
465 | "Rule '{rule_id}' was not found in the benchmark. " |
||
466 | "Please check all rule refinements for rule: '{rule_id}', for example: " |
||
467 | "- {rule_id}.{property_}={value}' in profile {profile_id}." |
||
468 | .format(rule_id=refine_rule, profile_id=self.id_, |
||
469 | property_=a_refinement[0], value=a_refinement[1]) |
||
470 | ) |
||
471 | raise ValueError(msg) |
||
472 | |||
473 | if refine_rule not in self.get_rule_selectors(): |
||
474 | msg = ("- {rule_id}.{property_}={value}' in profile '{profile_id}' is refining " |
||
475 | "a rule that is not selected by it. The refinement will not have any " |
||
476 | "noticeable effect. Either select the rule or remove the rule refinement." |
||
477 | .format(rule_id=refine_rule, property_=a_refinement[0], |
||
478 | value=a_refinement[1], profile_id=self.id_) |
||
479 | ) |
||
480 | raise ValueError(msg) |
||
481 | |||
482 | 2 | def validate_variables(self, variables): |
|
483 | variables_by_id = dict() |
||
484 | for var in variables: |
||
485 | variables_by_id[var.id_] = var |
||
486 | |||
487 | for var_id, our_val in self.variables.items(): |
||
488 | if var_id not in variables_by_id: |
||
489 | all_vars_list = [" - %s" % v for v in variables_by_id.keys()] |
||
490 | msg = ( |
||
491 | "Value '{var_id}' in profile '{profile_name}' is not known. " |
||
492 | "We know only variables:\n{var_names}" |
||
493 | .format( |
||
494 | var_id=var_id, profile_name=self.id_, |
||
495 | var_names="\n".join(sorted(all_vars_list))) |
||
496 | ) |
||
497 | raise ValueError(msg) |
||
498 | |||
499 | allowed_selectors = [str(s) for s in variables_by_id[var_id].options.keys()] |
||
500 | if our_val not in allowed_selectors: |
||
501 | msg = ( |
||
502 | "Value '{var_id}' in profile '{profile_name}' " |
||
503 | "uses the selector '{our_val}'. " |
||
504 | "This is not possible, as only selectors {all_selectors} are available. " |
||
505 | "Either change the selector used in the profile, or " |
||
506 | "add the selector-value pair to the variable definition." |
||
507 | .format( |
||
508 | var_id=var_id, profile_name=self.id_, our_val=our_val, |
||
509 | all_selectors=allowed_selectors, |
||
510 | ) |
||
511 | ) |
||
512 | raise ValueError(msg) |
||
513 | |||
514 | 2 | def validate_rules(self, rules, groups): |
|
515 | existing_rule_ids = [r.id_ for r in rules] |
||
516 | rule_selectors = self.get_rule_selectors() |
||
517 | for id_ in rule_selectors: |
||
518 | if id_ in groups: |
||
519 | msg = ( |
||
520 | "You have selected a group '{group_id}' instead of a " |
||
521 | "rule. Groups have no effect in the profile and are not " |
||
522 | "allowed to be selected. Please remove '{group_id}' " |
||
523 | "from profile '{profile_id}' before proceeding." |
||
524 | .format(group_id=id_, profile_id=self.id_) |
||
525 | ) |
||
526 | raise ValueError(msg) |
||
527 | if id_ not in existing_rule_ids: |
||
528 | msg = ( |
||
529 | "Rule '{rule_id}' was not found in the benchmark. Please " |
||
530 | "remove rule '{rule_id}' from profile '{profile_id}' " |
||
531 | "before proceeding." |
||
532 | .format(rule_id=id_, profile_id=self.id_) |
||
533 | ) |
||
534 | raise ValueError(msg) |
||
535 | |||
536 | 2 | def __sub__(self, other): |
|
537 | profile = Profile(self.id_) |
||
538 | profile.title = self.title |
||
539 | profile.description = self.description |
||
540 | profile.extends = self.extends |
||
541 | profile.platforms = self.platforms |
||
542 | profile.platform = self.platform |
||
543 | profile.selected = list(set(self.selected) - set(other.selected)) |
||
544 | profile.selected.sort() |
||
545 | profile.unselected = list(set(self.unselected) - set(other.unselected)) |
||
546 | profile.variables = dict ((k, v) for (k, v) in self.variables.items() |
||
547 | if k not in other.variables or v != other.variables[k]) |
||
548 | return profile |
||
549 | |||
550 | |||
551 | 2 | class ResolvableProfile(Profile): |
|
552 | 2 | def __init__(self, * args, ** kwargs): |
|
553 | super(ResolvableProfile, self).__init__(* args, ** kwargs) |
||
554 | self.resolved = False |
||
555 | |||
556 | 2 | def _controls_ids_to_controls(self, controls_manager, policy_id, control_id_list): |
|
557 | items = [controls_manager.get_control(policy_id, cid) for cid in control_id_list] |
||
558 | return items |
||
559 | |||
560 | 2 | def resolve_controls(self, controls_manager): |
|
561 | pass |
||
562 | |||
563 | 2 | def extend_by(self, extended_profile): |
|
564 | self.update_with(extended_profile) |
||
565 | |||
566 | 2 | def resolve_selections_with_rules(self, rules_by_id): |
|
567 | selections = set() |
||
568 | for rid in self.selected: |
||
569 | if rid not in rules_by_id: |
||
570 | continue |
||
571 | rule = rules_by_id[rid] |
||
572 | if not self.rule_filter(rule): |
||
573 | continue |
||
574 | selections.add(rid) |
||
575 | self.selected = list(selections) |
||
576 | |||
577 | 2 | def resolve(self, all_profiles, rules_by_id, controls_manager=None): |
|
578 | if self.resolved: |
||
579 | return |
||
580 | |||
581 | if controls_manager: |
||
582 | self.resolve_controls(controls_manager) |
||
583 | |||
584 | self.resolve_selections_with_rules(rules_by_id) |
||
585 | |||
586 | if self.extends: |
||
587 | if self.extends not in all_profiles: |
||
588 | msg = ( |
||
589 | "Profile {name} extends profile {extended}, but " |
||
590 | "only profiles {known_profiles} are available for resolution." |
||
591 | .format(name=self.id_, extended=self.extends, |
||
592 | known_profiles=list(all_profiles.keys()))) |
||
593 | raise RuntimeError(msg) |
||
594 | extended_profile = all_profiles[self.extends] |
||
595 | extended_profile.resolve(all_profiles, rules_by_id, controls_manager) |
||
596 | |||
597 | self.extend_by(extended_profile) |
||
598 | |||
599 | self.selected = [s for s in set(self.selected) if s not in self.unselected] |
||
600 | |||
601 | self.unselected = [] |
||
602 | self.extends = None |
||
603 | |||
604 | self.selected = sorted(self.selected) |
||
605 | |||
606 | for rid in self.selected: |
||
607 | if rid not in rules_by_id: |
||
608 | msg = ( |
||
609 | "Rule {rid} is selected by {profile}, but the rule is not available. " |
||
610 | "This may be caused by a discrepancy of prodtypes." |
||
611 | .format(rid=rid, profile=self.id_)) |
||
612 | raise ValueError(msg) |
||
613 | |||
614 | self.resolved = True |
||
615 | |||
616 | |||
617 | 2 | class ProfileWithInlinePolicies(ResolvableProfile): |
|
618 | 2 | def __init__(self, * args, ** kwargs): |
|
619 | super(ProfileWithInlinePolicies, self).__init__(* args, ** kwargs) |
||
620 | self.controls_by_policy = defaultdict(list) |
||
621 | |||
622 | 2 | def apply_selection(self, item): |
|
623 | # ":" is the delimiter for controls but not when the item is a variable |
||
624 | if ":" in item and "=" not in item: |
||
625 | policy_id, control_id = item.split(":", 1) |
||
626 | self.controls_by_policy[policy_id].append(control_id) |
||
627 | else: |
||
628 | super(ProfileWithInlinePolicies, self).apply_selection(item) |
||
629 | |||
630 | 2 | def _process_controls_ids_into_controls(self, controls_manager, policy_id, controls_ids): |
|
631 | controls = [] |
||
632 | for cid in controls_ids: |
||
633 | if not cid.startswith("all"): |
||
634 | controls.extend( |
||
635 | self._controls_ids_to_controls(controls_manager, policy_id, [cid])) |
||
636 | elif ":" in cid: |
||
637 | _, level_id = cid.split(":", 1) |
||
638 | controls.extend( |
||
639 | controls_manager.get_all_controls_of_level(policy_id, level_id)) |
||
640 | else: |
||
641 | controls.extend( |
||
642 | controls_manager.get_all_controls(policy_id)) |
||
643 | return controls |
||
644 | |||
645 | 2 | def resolve_controls(self, controls_manager): |
|
646 | for policy_id, controls_ids in self.controls_by_policy.items(): |
||
647 | controls = self._process_controls_ids_into_controls( |
||
648 | controls_manager, policy_id, controls_ids) |
||
649 | |||
650 | for c in controls: |
||
651 | self.update_with(c) |
||
652 | |||
653 | |||
654 | 2 | class Value(XCCDFEntity): |
|
655 | """Represents XCCDF Value |
||
656 | """ |
||
657 | 2 | KEYS = dict( |
|
658 | title=lambda: "", |
||
659 | description=lambda: "", |
||
660 | type=lambda: "", |
||
661 | operator=lambda: "equals", |
||
662 | interactive=lambda: False, |
||
663 | options=lambda: dict(), |
||
664 | warnings=lambda: list(), |
||
665 | ** XCCDFEntity.KEYS |
||
666 | ) |
||
667 | |||
668 | 2 | MANDATORY_KEYS = { |
|
669 | "title", |
||
670 | "description", |
||
671 | "type", |
||
672 | } |
||
673 | |||
674 | 2 | @classmethod |
|
675 | def process_input_dict(cls, input_contents, env_yaml): |
||
676 | 2 | input_contents["interactive"] = ( |
|
677 | input_contents.get("interactive", "false").lower() == "true") |
||
678 | |||
679 | 2 | data = super(Value, cls).process_input_dict(input_contents, env_yaml) |
|
680 | |||
681 | 2 | possible_operators = ["equals", "not equal", "greater than", |
|
682 | "less than", "greater than or equal", |
||
683 | "less than or equal", "pattern match"] |
||
684 | |||
685 | 2 | if data["operator"] not in possible_operators: |
|
686 | raise ValueError( |
||
687 | "Found an invalid operator value '%s'. " |
||
688 | "Expected one of: %s" |
||
689 | % (data["operator"], ", ".join(possible_operators)) |
||
690 | ) |
||
691 | |||
692 | 2 | return data |
|
693 | |||
694 | 2 | @classmethod |
|
695 | 2 | def from_yaml(cls, yaml_file, env_yaml=None): |
|
696 | 2 | value = super(Value, cls).from_yaml(yaml_file, env_yaml) |
|
697 | |||
698 | 2 | check_warnings(value) |
|
699 | |||
700 | 2 | return value |
|
701 | |||
702 | 2 | def to_xml_element(self): |
|
703 | value = ET.Element('Value') |
||
704 | value.set('id', self.id_) |
||
705 | value.set('type', self.type) |
||
706 | if self.operator != "equals": # equals is the default |
||
707 | value.set('operator', self.operator) |
||
708 | if self.interactive: # False is the default |
||
709 | value.set('interactive', 'true') |
||
710 | title = ET.SubElement(value, 'title') |
||
711 | title.text = self.title |
||
712 | add_sub_element(value, 'description', self.description) |
||
713 | add_warning_elements(value, self.warnings) |
||
714 | |||
715 | for selector, option in self.options.items(): |
||
716 | # do not confuse Value with big V with value with small v |
||
717 | # value is child element of Value |
||
718 | value_small = ET.SubElement(value, 'value') |
||
719 | # by XCCDF spec, default value is value without selector |
||
720 | if selector != "default": |
||
721 | value_small.set('selector', str(selector)) |
||
722 | value_small.text = str(option) |
||
723 | |||
724 | return value |
||
725 | |||
726 | 2 | def to_file(self, file_name): |
|
727 | root = self.to_xml_element() |
||
728 | tree = ET.ElementTree(root) |
||
729 | tree.write(file_name) |
||
730 | |||
731 | |||
732 | 2 | class Benchmark(XCCDFEntity): |
|
733 | """Represents XCCDF Benchmark |
||
734 | """ |
||
735 | 2 | KEYS = dict( |
|
736 | title=lambda: "", |
||
737 | status=lambda: "", |
||
738 | description=lambda: "", |
||
739 | notice_id=lambda: "", |
||
740 | notice_description=lambda: "", |
||
741 | front_matter=lambda: "", |
||
742 | rear_matter=lambda: "", |
||
743 | cpes=lambda: list(), |
||
744 | version=lambda: "0", |
||
745 | profiles=lambda: list(), |
||
746 | values=lambda: dict(), |
||
747 | groups=lambda: dict(), |
||
748 | rules=lambda: dict(), |
||
749 | product_cpe_names=lambda: list(), |
||
750 | ** XCCDFEntity.KEYS |
||
751 | ) |
||
752 | |||
753 | 2 | MANDATORY_KEYS = { |
|
754 | "title", |
||
755 | "status", |
||
756 | "description", |
||
757 | "front_matter", |
||
758 | "rear_matter", |
||
759 | "version", |
||
760 | } |
||
761 | |||
762 | 2 | GENERIC_FILENAME = "benchmark.yml" |
|
763 | |||
764 | 2 | View Code Duplication | def load_entities(self, rules_by_id, values_by_id, groups_by_id): |
|
|||
765 | for rid, val in self.rules.items(): |
||
766 | if not val: |
||
767 | self.rules[rid] = rules_by_id[rid] |
||
768 | |||
769 | for vid, val in self.values.items(): |
||
770 | if not val: |
||
771 | self.values[vid] = values_by_id[vid] |
||
772 | |||
773 | for gid, val in self.groups.items(): |
||
774 | if not val: |
||
775 | self.groups[gid] = groups_by_id[gid] |
||
776 | |||
777 | 2 | @classmethod |
|
778 | def process_input_dict(cls, input_contents, env_yaml): |
||
779 | input_contents["front_matter"] = input_contents["front-matter"] |
||
780 | del input_contents["front-matter"] |
||
781 | input_contents["rear_matter"] = input_contents["rear-matter"] |
||
782 | del input_contents["rear-matter"] |
||
783 | |||
784 | data = super(Benchmark, cls).process_input_dict(input_contents, env_yaml) |
||
785 | |||
786 | notice_contents = required_key(input_contents, "notice") |
||
787 | del input_contents["notice"] |
||
788 | |||
789 | data["notice_id"] = required_key(notice_contents, "id") |
||
790 | del notice_contents["id"] |
||
791 | |||
792 | data["notice_description"] = required_key(notice_contents, "description") |
||
793 | del notice_contents["description"] |
||
794 | |||
795 | data["version"] = str(data["version"]) |
||
796 | |||
797 | return data |
||
798 | |||
799 | 2 | def represent_as_dict(self): |
|
800 | data = super(Benchmark, cls).represent_as_dict() |
||
801 | data["rear-matter"] = data["rear_matter"] |
||
802 | del data["rear_matter"] |
||
803 | |||
804 | data["front-matter"] = data["front_matter"] |
||
805 | del data["front_matter"] |
||
806 | return data |
||
807 | |||
808 | 2 | @classmethod |
|
809 | 2 | def from_yaml(cls, yaml_file, env_yaml=None, benchmark_id="product-name"): |
|
810 | benchmark = super(Benchmark, cls).from_yaml(yaml_file, env_yaml) |
||
811 | if env_yaml: |
||
812 | benchmark.product_cpe_names = env_yaml["product_cpes"].get_product_cpe_names() |
||
813 | benchmark.cpe_platform_spec = env_yaml["product_cpes"].cpe_platform_specification |
||
814 | |||
815 | benchmark.id_ = benchmark_id |
||
816 | |||
817 | return benchmark |
||
818 | |||
819 | 2 | def add_profiles_from_dir(self, dir_, env_yaml): |
|
820 | for dir_item in sorted(os.listdir(dir_)): |
||
821 | dir_item_path = os.path.join(dir_, dir_item) |
||
822 | if not os.path.isfile(dir_item_path): |
||
823 | continue |
||
824 | |||
825 | _, ext = os.path.splitext(os.path.basename(dir_item_path)) |
||
826 | if ext != '.profile': |
||
827 | sys.stderr.write( |
||
828 | "Encountered file '%s' while looking for profiles, " |
||
829 | "extension '%s' is unknown. Skipping..\n" |
||
830 | % (dir_item, ext) |
||
831 | ) |
||
832 | continue |
||
833 | |||
834 | try: |
||
835 | new_profile = ProfileWithInlinePolicies.from_yaml(dir_item_path, env_yaml) |
||
836 | except DocumentationNotComplete: |
||
837 | continue |
||
838 | except Exception as exc: |
||
839 | msg = ("Error building profile from '{fname}': '{error}'" |
||
840 | .format(fname=dir_item_path, error=str(exc))) |
||
841 | raise RuntimeError(msg) |
||
842 | if new_profile is None: |
||
843 | continue |
||
844 | |||
845 | self.profiles.append(new_profile) |
||
846 | |||
847 | 2 | def to_xml_element(self): |
|
848 | root = ET.Element('Benchmark') |
||
849 | root.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance') |
||
850 | root.set('xmlns:xhtml', 'http://www.w3.org/1999/xhtml') |
||
851 | root.set('xmlns:dc', 'http://purl.org/dc/elements/1.1/') |
||
852 | root.set('xmlns:cpe-lang', 'http://cpe.mitre.org/language/2.0') |
||
853 | root.set('id', 'product-name') |
||
854 | root.set('xsi:schemaLocation', |
||
855 | 'http://checklists.nist.gov/xccdf/1.1 xccdf-1.1.4.xsd') |
||
856 | root.set('style', 'SCAP_1.1') |
||
857 | root.set('resolved', 'false') |
||
858 | root.set('xml:lang', 'en-US') |
||
859 | status = ET.SubElement(root, 'status') |
||
860 | status.set('date', datetime.date.today().strftime("%Y-%m-%d")) |
||
861 | status.text = self.status |
||
862 | add_sub_element(root, "title", self.title) |
||
863 | add_sub_element(root, "description", self.description) |
||
864 | notice = add_sub_element(root, "notice", self.notice_description) |
||
865 | notice.set('id', self.notice_id) |
||
866 | add_sub_element(root, "front-matter", self.front_matter) |
||
867 | add_sub_element(root, "rear-matter", self.rear_matter) |
||
868 | # if there are no platforms, do not output platform-specification at all |
||
869 | if len(self.cpe_platform_spec.platforms) > 0: |
||
870 | root.append(self.cpe_platform_spec.to_xml_element()) |
||
871 | |||
872 | # The Benchmark applicability is determined by the CPEs |
||
873 | # defined in the product.yml |
||
874 | for cpe_name in self.product_cpe_names: |
||
875 | plat = ET.SubElement(root, "platform") |
||
876 | plat.set("idref", cpe_name) |
||
877 | |||
878 | version = ET.SubElement(root, 'version') |
||
879 | version.text = self.version |
||
880 | ET.SubElement(root, "metadata") |
||
881 | |||
882 | for profile in self.profiles: |
||
883 | root.append(profile.to_xml_element()) |
||
884 | |||
885 | for value in self.values.values(): |
||
886 | root.append(value.to_xml_element()) |
||
887 | |||
888 | groups_in_bench = list(self.groups.keys()) |
||
889 | priority_order = ["system", "services"] |
||
890 | groups_in_bench = reorder_according_to_ordering(groups_in_bench, priority_order) |
||
891 | |||
892 | # Make system group the first, followed by services group |
||
893 | for group_id in groups_in_bench: |
||
894 | group = self.groups.get(group_id) |
||
895 | # Products using application benchmark don't have system or services group |
||
896 | if group is not None: |
||
897 | root.append(group.to_xml_element()) |
||
898 | |||
899 | for rule in self.rules.values(): |
||
900 | root.append(rule.to_xml_element()) |
||
901 | |||
902 | return root |
||
903 | |||
904 | 2 | def to_file(self, file_name, ): |
|
905 | root = self.to_xml_element() |
||
906 | tree = ET.ElementTree(root) |
||
907 | tree.write(file_name) |
||
908 | |||
909 | 2 | def add_value(self, value): |
|
910 | if value is None: |
||
911 | return |
||
912 | self.values[value.id_] = value |
||
913 | |||
914 | # The benchmark is also considered a group, so this function signature needs to match |
||
915 | # Group()'s add_group() |
||
916 | 2 | def add_group(self, group, env_yaml=None): |
|
917 | if group is None: |
||
918 | return |
||
919 | self.groups[group.id_] = group |
||
920 | |||
921 | 2 | def add_rule(self, rule): |
|
922 | if rule is None: |
||
923 | return |
||
924 | self.rules[rule.id_] = rule |
||
925 | |||
926 | 2 | def to_xccdf(self): |
|
927 | """We can easily extend this script to generate a valid XCCDF instead |
||
928 | of SSG SHORTHAND. |
||
929 | """ |
||
930 | raise NotImplementedError |
||
931 | |||
932 | 2 | def __str__(self): |
|
933 | return self.id_ |
||
934 | |||
935 | |||
936 | 2 | class Group(XCCDFEntity): |
|
937 | """Represents XCCDF Group |
||
938 | """ |
||
939 | 2 | ATTRIBUTES_TO_PASS_ON = ( |
|
940 | "platforms", |
||
941 | "cpe_platform_names", |
||
942 | ) |
||
943 | |||
944 | 2 | GENERIC_FILENAME = "group.yml" |
|
945 | |||
946 | 2 | KEYS = dict( |
|
947 | prodtype=lambda: "all", |
||
948 | title=lambda: "", |
||
949 | description=lambda: "", |
||
950 | warnings=lambda: list(), |
||
951 | requires=lambda: list(), |
||
952 | conflicts=lambda: list(), |
||
953 | values=lambda: dict(), |
||
954 | groups=lambda: dict(), |
||
955 | rules=lambda: dict(), |
||
956 | platform=lambda: "", |
||
957 | platforms=lambda: set(), |
||
958 | cpe_platform_names=lambda: set(), |
||
959 | ** XCCDFEntity.KEYS |
||
960 | ) |
||
961 | |||
962 | 2 | MANDATORY_KEYS = { |
|
963 | "title", |
||
964 | "status", |
||
965 | "description", |
||
966 | "front_matter", |
||
967 | "rear_matter", |
||
968 | "version", |
||
969 | } |
||
970 | |||
971 | 2 | @classmethod |
|
972 | def process_input_dict(cls, input_contents, env_yaml): |
||
973 | data = super(Group, cls).process_input_dict(input_contents, env_yaml) |
||
974 | if data["rules"]: |
||
975 | rule_ids = data["rules"] |
||
976 | data["rules"] = {rid: None for rid in rule_ids} |
||
977 | |||
978 | if data["groups"]: |
||
979 | group_ids = data["groups"] |
||
980 | data["groups"] = {gid: None for gid in group_ids} |
||
981 | |||
982 | if data["values"]: |
||
983 | value_ids = data["values"] |
||
984 | data["values"] = {vid: None for vid in value_ids} |
||
985 | |||
986 | if data["platform"]: |
||
987 | data["platforms"].add(data["platform"]) |
||
988 | |||
989 | # parse platform definition and get CPEAL platform |
||
990 | if data["platforms"]: |
||
991 | for platform in data["platforms"]: |
||
992 | cpe_platform = parse_platform_definition(platform, env_yaml["product_cpes"]) |
||
993 | data["cpe_platform_names"].add(cpe_platform.id) |
||
994 | # add platform to platform specification |
||
995 | env_yaml["product_cpes"].cpe_platform_specification.add_platform(cpe_platform) |
||
996 | return data |
||
997 | |||
998 | 2 | View Code Duplication | def load_entities(self, rules_by_id, values_by_id, groups_by_id): |
999 | for rid, val in self.rules.items(): |
||
1000 | if not val: |
||
1001 | self.rules[rid] = rules_by_id[rid] |
||
1002 | |||
1003 | for vid, val in self.values.items(): |
||
1004 | if not val: |
||
1005 | self.values[vid] = values_by_id[vid] |
||
1006 | |||
1007 | for gid, val in self.groups.items(): |
||
1008 | if not val: |
||
1009 | self.groups[gid] = groups_by_id[gid] |
||
1010 | |||
1011 | 2 | def represent_as_dict(self): |
|
1012 | yaml_contents = super(Group, self).represent_as_dict() |
||
1013 | |||
1014 | if self.rules: |
||
1015 | yaml_contents["rules"] = sorted(list(self.rules.keys())) |
||
1016 | if self.groups: |
||
1017 | yaml_contents["groups"] = sorted(list(self.groups.keys())) |
||
1018 | if self.values: |
||
1019 | yaml_contents["values"] = sorted(list(self.values.keys())) |
||
1020 | |||
1021 | return yaml_contents |
||
1022 | |||
1023 | 2 | def validate_prodtype(self, yaml_file): |
|
1024 | for ptype in self.prodtype.split(","): |
||
1025 | if ptype.strip() != ptype: |
||
1026 | msg = ( |
||
1027 | "Comma-separated '{prodtype}' prodtype " |
||
1028 | "in {yaml_file} contains whitespace." |
||
1029 | .format(prodtype=self.prodtype, yaml_file=yaml_file)) |
||
1030 | raise ValueError(msg) |
||
1031 | |||
1032 | 2 | def to_xml_element(self): |
|
1033 | group = ET.Element('Group') |
||
1034 | group.set('id', self.id_) |
||
1035 | if self.prodtype != "all": |
||
1036 | group.set("prodtype", self.prodtype) |
||
1037 | title = ET.SubElement(group, 'title') |
||
1038 | title.text = self.title |
||
1039 | add_sub_element(group, 'description', self.description) |
||
1040 | add_warning_elements(group, self.warnings) |
||
1041 | add_nondata_subelements(group, "requires", "id", self.requires) |
||
1042 | add_nondata_subelements(group, "conflicts", "id", self.conflicts) |
||
1043 | |||
1044 | for cpe_platform_name in self.cpe_platform_names: |
||
1045 | platform_el = ET.SubElement(group, "platform") |
||
1046 | platform_el.set("idref", "#"+cpe_platform_name) |
||
1047 | |||
1048 | for _value in self.values.values(): |
||
1049 | group.append(_value.to_xml_element()) |
||
1050 | |||
1051 | # Rules that install or remove packages affect remediation |
||
1052 | # of other rules. |
||
1053 | # When packages installed/removed rules come first: |
||
1054 | # The Rules are ordered in more logical way, and |
||
1055 | # remediation order is natural, first the package is installed, then configured. |
||
1056 | rules_in_group = list(self.rules.keys()) |
||
1057 | regex = (r'(package_.*_(installed|removed))|' + |
||
1058 | r'(service_.*_(enabled|disabled))|' + |
||
1059 | r'install_smartcard_packages$') |
||
1060 | priority_order = ["installed", "install_smartcard_packages", "removed", |
||
1061 | "enabled", "disabled"] |
||
1062 | rules_in_group = reorder_according_to_ordering(rules_in_group, priority_order, regex) |
||
1063 | |||
1064 | # Add rules in priority order, first all packages installed, then removed, |
||
1065 | # followed by services enabled, then disabled |
||
1066 | for rule_id in rules_in_group: |
||
1067 | group.append(self.rules.get(rule_id).to_xml_element()) |
||
1068 | |||
1069 | # Add the sub groups after any current level group rules. |
||
1070 | # As package installed/removed and service enabled/disabled rules are usuallly in |
||
1071 | # top level group, this ensures groups that further configure a package or service |
||
1072 | # are after rules that install or remove it. |
||
1073 | groups_in_group = list(self.groups.keys()) |
||
1074 | priority_order = [ |
||
1075 | # Make sure rpm_verify_(hashes|permissions|ownership) are run before any other rule. |
||
1076 | # Due to conflicts between rules rpm_verify_* rules and any rule that configures |
||
1077 | # stricter settings, like file_permissions_grub2_cfg and sudo_dedicated_group, |
||
1078 | # the rules deviating from the system default should be evaluated later. |
||
1079 | # So that in the end the system has contents, permissions and ownership reset, and |
||
1080 | # any deviations or stricter settings are applied by the rules in the profile. |
||
1081 | "software", "integrity", "integrity-software", "rpm_verification", |
||
1082 | |||
1083 | # The account group has to precede audit group because |
||
1084 | # the rule package_screen_installed is desired to be executed before the rule |
||
1085 | # audit_rules_privileged_commands, othervise the rule |
||
1086 | # does not catch newly installed screen binary during remediation |
||
1087 | # and report fail |
||
1088 | "accounts", "auditing", |
||
1089 | |||
1090 | |||
1091 | # The FIPS group should come before Crypto, |
||
1092 | # if we want to set a different (stricter) Crypto Policy than FIPS. |
||
1093 | "fips", "crypto", |
||
1094 | |||
1095 | # The firewalld_activation must come before ruleset_modifications, othervise |
||
1096 | # remediations for ruleset_modifications won't work |
||
1097 | "firewalld_activation", "ruleset_modifications", |
||
1098 | |||
1099 | # Rules from group disabling_ipv6 must precede rules from configuring_ipv6, |
||
1100 | # otherwise the remediation prints error although it is successful |
||
1101 | "disabling_ipv6", "configuring_ipv6" |
||
1102 | ] |
||
1103 | groups_in_group = reorder_according_to_ordering(groups_in_group, priority_order) |
||
1104 | for group_id in groups_in_group: |
||
1105 | _group = self.groups[group_id] |
||
1106 | group.append(_group.to_xml_element()) |
||
1107 | |||
1108 | return group |
||
1109 | |||
1110 | 2 | def to_file(self, file_name): |
|
1111 | root = self.to_xml_element() |
||
1112 | tree = ET.ElementTree(root) |
||
1113 | tree.write(file_name) |
||
1114 | |||
1115 | 2 | def add_value(self, value): |
|
1116 | if value is None: |
||
1117 | return |
||
1118 | self.values[value.id_] = value |
||
1119 | |||
1120 | 2 | def add_group(self, group, env_yaml=None): |
|
1121 | if group is None: |
||
1122 | return |
||
1123 | if self.platforms and not group.platforms: |
||
1124 | group.platforms = self.platforms |
||
1125 | self.groups[group.id_] = group |
||
1126 | self._pass_our_properties_on_to(group) |
||
1127 | |||
1128 | # Once the group has inherited properties, update cpe_names |
||
1129 | if env_yaml: |
||
1130 | for platform in group.platforms: |
||
1131 | cpe_platform = parse_platform_definition( |
||
1132 | platform, env_yaml["product_cpes"]) |
||
1133 | group.cpe_platform_names.add(cpe_platform.id) |
||
1134 | env_yaml["product_cpes"].cpe_platform_specification.add_platform( |
||
1135 | cpe_platform) |
||
1136 | |||
1137 | 2 | def _pass_our_properties_on_to(self, obj): |
|
1138 | for attr in self.ATTRIBUTES_TO_PASS_ON: |
||
1139 | if hasattr(obj, attr) and getattr(obj, attr) is None: |
||
1140 | setattr(obj, attr, getattr(self, attr)) |
||
1141 | |||
1142 | 2 | def add_rule(self, rule, env_yaml=None): |
|
1143 | if rule is None: |
||
1144 | return |
||
1145 | if self.platforms and not rule.platforms: |
||
1146 | rule.platforms = self.platforms |
||
1147 | self.rules[rule.id_] = rule |
||
1148 | self._pass_our_properties_on_to(rule) |
||
1149 | |||
1150 | # Once the rule has inherited properties, update cpe_platform_names |
||
1151 | if env_yaml: |
||
1152 | for platform in rule.platforms: |
||
1153 | cpe_platform = parse_platform_definition( |
||
1154 | platform, env_yaml["product_cpes"]) |
||
1155 | rule.cpe_platform_names.add(cpe_platform.id) |
||
1156 | env_yaml["product_cpes"].cpe_platform_specification.add_platform( |
||
1157 | cpe_platform) |
||
1158 | |||
1159 | 2 | def __str__(self): |
|
1160 | return self.id_ |
||
1161 | |||
1162 | |||
1163 | 2 | def noop_rule_filterfunc(rule): |
|
1164 | return True |
||
1165 | |||
1166 | 2 | def rule_filter_from_def(filterdef): |
|
1167 | if filterdef is None or filterdef == "": |
||
1168 | return noop_rule_filterfunc |
||
1169 | |||
1170 | def filterfunc(rule): |
||
1171 | # Remove globals for security and only expose |
||
1172 | # variables relevant to the rule |
||
1173 | return eval(filterdef, {"__builtins__": None}, rule.__dict__) |
||
1174 | return filterfunc |
||
1175 | |||
1176 | |||
1177 | 2 | class Rule(XCCDFEntity): |
|
1178 | """Represents XCCDF Rule |
||
1179 | """ |
||
1180 | 2 | KEYS = dict( |
|
1181 | prodtype=lambda: "all", |
||
1182 | title=lambda: "", |
||
1183 | description=lambda: "", |
||
1184 | rationale=lambda: "", |
||
1185 | severity=lambda: "", |
||
1186 | references=lambda: dict(), |
||
1187 | identifiers=lambda: dict(), |
||
1188 | ocil_clause=lambda: None, |
||
1189 | ocil=lambda: None, |
||
1190 | oval_external_content=lambda: None, |
||
1191 | warnings=lambda: list(), |
||
1192 | conflicts=lambda: list(), |
||
1193 | requires=lambda: list(), |
||
1194 | platform=lambda: None, |
||
1195 | platforms=lambda: set(), |
||
1196 | inherited_platforms=lambda: list(), |
||
1197 | template=lambda: None, |
||
1198 | cpe_platform_names=lambda: set(), |
||
1199 | ** XCCDFEntity.KEYS |
||
1200 | ) |
||
1201 | |||
1202 | 2 | MANDATORY_KEYS = { |
|
1203 | "title", |
||
1204 | "description", |
||
1205 | "rationale", |
||
1206 | "severity", |
||
1207 | } |
||
1208 | |||
1209 | 2 | GENERIC_FILENAME = "rule.yml" |
|
1210 | 2 | ID_LABEL = "rule_id" |
|
1211 | |||
1212 | 2 | PRODUCT_REFERENCES = ("stigid", "cis",) |
|
1213 | 2 | GLOBAL_REFERENCES = ("srg", "vmmsrg", "disa", "cis-csc",) |
|
1214 | |||
1215 | 2 | def __init__(self, id_): |
|
1216 | 2 | super(Rule, self).__init__(id_) |
|
1217 | 2 | self.sce_metadata = None |
|
1218 | |||
1219 | 2 | def __deepcopy__(self, memo): |
|
1220 | cls = self.__class__ |
||
1221 | result = cls.__new__(cls) |
||
1222 | memo[id(self)] = result |
||
1223 | for k, v in self.__dict__.items(): |
||
1224 | # These are difficult to deep copy, so let's just re-use them. |
||
1225 | if k != "template" and k != "local_env_yaml": |
||
1226 | setattr(result, k, deepcopy(v, memo)) |
||
1227 | else: |
||
1228 | setattr(result, k, v) |
||
1229 | return result |
||
1230 | |||
1231 | 2 | @classmethod |
|
1232 | 2 | def from_yaml(cls, yaml_file, env_yaml=None, sce_metadata=None): |
|
1233 | 2 | rule = super(Rule, cls).from_yaml(yaml_file, env_yaml) |
|
1234 | |||
1235 | # platforms are read as list from the yaml file |
||
1236 | # we need them to convert to set again |
||
1237 | 2 | rule.platforms = set(rule.platforms) |
|
1238 | |||
1239 | # rule.platforms.update(set(rule.inherited_platforms)) |
||
1240 | |||
1241 | 2 | check_warnings(rule) |
|
1242 | |||
1243 | # ensure that content of rule.platform is in rule.platforms as |
||
1244 | # well |
||
1245 | 2 | if rule.platform is not None: |
|
1246 | 2 | rule.platforms.add(rule.platform) |
|
1247 | |||
1248 | # Convert the platform names to CPE names |
||
1249 | # But only do it if an env_yaml was specified (otherwise there would be no product CPEs |
||
1250 | # to lookup), and the rule's prodtype matches the product being built |
||
1251 | 2 | if ( |
|
1252 | env_yaml and env_yaml["product"] in parse_prodtype(rule.prodtype) |
||
1253 | or env_yaml and rule.prodtype == "all"): |
||
1254 | # parse platform definition and get CPEAL platform |
||
1255 | for platform in rule.platforms: |
||
1256 | cpe_platform = parse_platform_definition( |
||
1257 | platform, env_yaml["product_cpes"]) |
||
1258 | rule.cpe_platform_names.add(cpe_platform.id) |
||
1259 | # add platform to platform specification |
||
1260 | env_yaml["product_cpes"].cpe_platform_specification.add_platform( |
||
1261 | cpe_platform) |
||
1262 | |||
1263 | |||
1264 | 2 | if sce_metadata and rule.id_ in sce_metadata: |
|
1265 | rule.sce_metadata = sce_metadata[rule.id_] |
||
1266 | rule.sce_metadata["relative_path"] = os.path.join( |
||
1267 | env_yaml["product"], "checks/sce", rule.sce_metadata['filename']) |
||
1268 | |||
1269 | 2 | rule.validate_prodtype(yaml_file) |
|
1270 | 2 | rule.validate_identifiers(yaml_file) |
|
1271 | 2 | rule.validate_references(yaml_file) |
|
1272 | 2 | return rule |
|
1273 | |||
1274 | 2 | def _verify_stigid_format(self, product): |
|
1275 | 2 | stig_id = self.references.get("stigid", None) |
|
1276 | 2 | if not stig_id: |
|
1277 | 2 | return |
|
1278 | 2 | if "," in stig_id: |
|
1279 | 2 | raise ValueError("Rules can not have multiple STIG IDs.") |
|
1280 | |||
1281 | 2 | def _verify_disa_cci_format(self): |
|
1282 | 2 | cci_id = self.references.get("disa", None) |
|
1283 | 2 | if not cci_id: |
|
1284 | 2 | return |
|
1285 | cci_ex = re.compile(r'^CCI-[0-9]{6}$') |
||
1286 | for cci in cci_id.split(","): |
||
1287 | if not cci_ex.match(cci): |
||
1288 | raise ValueError("CCI '{}' is in the wrong format! " |
||
1289 | "Format should be similar to: " |
||
1290 | "CCI-XXXXXX".format(cci)) |
||
1291 | self.references["disa"] = cci_id |
||
1292 | |||
1293 | 2 | def normalize(self, product): |
|
1294 | 2 | try: |
|
1295 | 2 | self.make_refs_and_identifiers_product_specific(product) |
|
1296 | 2 | self.make_template_product_specific(product) |
|
1297 | 2 | except Exception as exc: |
|
1298 | 2 | msg = ( |
|
1299 | "Error normalizing '{rule}': {msg}" |
||
1300 | .format(rule=self.id_, msg=str(exc)) |
||
1301 | ) |
||
1302 | 2 | raise RuntimeError(msg) |
|
1303 | |||
1304 | 2 | def _get_product_only_references(self): |
|
1305 | 2 | product_references = dict() |
|
1306 | |||
1307 | 2 | for ref in Rule.PRODUCT_REFERENCES: |
|
1308 | 2 | start = "{0}@".format(ref) |
|
1309 | 2 | for gref, gval in self.references.items(): |
|
1310 | 2 | if ref == gref or gref.startswith(start): |
|
1311 | 2 | product_references[gref] = gval |
|
1312 | 2 | return product_references |
|
1313 | |||
1314 | 2 | def make_template_product_specific(self, product): |
|
1315 | 2 | product_suffix = "@{0}".format(product) |
|
1316 | |||
1317 | 2 | if not self.template: |
|
1318 | return |
||
1319 | |||
1320 | 2 | not_specific_vars = self.template.get("vars", dict()) |
|
1321 | 2 | specific_vars = self._make_items_product_specific( |
|
1322 | not_specific_vars, product_suffix, True) |
||
1323 | 2 | self.template["vars"] = specific_vars |
|
1324 | |||
1325 | 2 | not_specific_backends = self.template.get("backends", dict()) |
|
1326 | 2 | specific_backends = self._make_items_product_specific( |
|
1327 | not_specific_backends, product_suffix, True) |
||
1328 | 2 | self.template["backends"] = specific_backends |
|
1329 | |||
1330 | 2 | def make_refs_and_identifiers_product_specific(self, product): |
|
1331 | 2 | product_suffix = "@{0}".format(product) |
|
1332 | |||
1333 | 2 | product_references = self._get_product_only_references() |
|
1334 | 2 | general_references = self.references.copy() |
|
1335 | 2 | for todel in product_references: |
|
1336 | 2 | general_references.pop(todel) |
|
1337 | 2 | for ref in Rule.PRODUCT_REFERENCES: |
|
1338 | 2 | if ref in general_references: |
|
1339 | msg = "Unexpected reference identifier ({0}) without " |
||
1340 | msg += "product qualifier ({0}@{1}) while building rule " |
||
1341 | msg += "{2}" |
||
1342 | msg = msg.format(ref, product, self.id_) |
||
1343 | raise ValueError(msg) |
||
1344 | |||
1345 | 2 | to_set = dict( |
|
1346 | identifiers=(self.identifiers, False), |
||
1347 | general_references=(general_references, True), |
||
1348 | product_references=(product_references, False), |
||
1349 | ) |
||
1350 | 2 | for name, (dic, allow_overwrites) in to_set.items(): |
|
1351 | 2 | try: |
|
1352 | 2 | new_items = self._make_items_product_specific( |
|
1353 | dic, product_suffix, allow_overwrites) |
||
1354 | 2 | except ValueError as exc: |
|
1355 | 2 | msg = ( |
|
1356 | "Error processing {what} for rule '{rid}': {msg}" |
||
1357 | .format(what=name, rid=self.id_, msg=str(exc)) |
||
1358 | ) |
||
1359 | 2 | raise ValueError(msg) |
|
1360 | 2 | dic.clear() |
|
1361 | 2 | dic.update(new_items) |
|
1362 | |||
1363 | 2 | self.references = general_references |
|
1364 | 2 | self._verify_disa_cci_format() |
|
1365 | 2 | self.references.update(product_references) |
|
1366 | |||
1367 | 2 | self._verify_stigid_format(product) |
|
1368 | |||
1369 | 2 | def _make_items_product_specific(self, items_dict, product_suffix, allow_overwrites=False): |
|
1370 | 2 | new_items = dict() |
|
1371 | 2 | for full_label, value in items_dict.items(): |
|
1372 | 2 | if "@" not in full_label and full_label not in new_items: |
|
1373 | 2 | new_items[full_label] = value |
|
1374 | 2 | continue |
|
1375 | |||
1376 | 2 | label = full_label.split("@")[0] |
|
1377 | |||
1378 | # this test should occur before matching product_suffix with the product qualifier |
||
1379 | # present in the reference, so it catches problems even for products that are not |
||
1380 | # being built at the moment |
||
1381 | 2 | if label in Rule.GLOBAL_REFERENCES: |
|
1382 | msg = ( |
||
1383 | "You cannot use product-qualified for the '{item_u}' reference. " |
||
1384 | "Please remove the product-qualifier and merge values with the " |
||
1385 | "existing reference if there is any. Original line: {item_q}: {value_q}" |
||
1386 | .format(item_u=label, item_q=full_label, value_q=value) |
||
1387 | ) |
||
1388 | raise ValueError(msg) |
||
1389 | |||
1390 | 2 | if not full_label.endswith(product_suffix): |
|
1391 | 2 | continue |
|
1392 | |||
1393 | 2 | if label in items_dict and not allow_overwrites and value != items_dict[label]: |
|
1394 | 2 | msg = ( |
|
1395 | "There is a product-qualified '{item_q}' item, " |
||
1396 | "but also an unqualified '{item_u}' item " |
||
1397 | "and those two differ in value - " |
||
1398 | "'{value_q}' vs '{value_u}' respectively." |
||
1399 | .format(item_q=full_label, item_u=label, |
||
1400 | value_q=value, value_u=items_dict[label]) |
||
1401 | ) |
||
1402 | 2 | raise ValueError(msg) |
|
1403 | 2 | new_items[label] = value |
|
1404 | 2 | return new_items |
|
1405 | |||
1406 | 2 | def validate_identifiers(self, yaml_file): |
|
1407 | 2 | if self.identifiers is None: |
|
1408 | raise ValueError("Empty identifier section in file %s" % yaml_file) |
||
1409 | |||
1410 | # Validate all identifiers are non-empty: |
||
1411 | 2 | for ident_type, ident_val in self.identifiers.items(): |
|
1412 | 2 | if not isinstance(ident_type, str) or not isinstance(ident_val, str): |
|
1413 | raise ValueError("Identifiers and values must be strings: %s in file %s" |
||
1414 | % (ident_type, yaml_file)) |
||
1415 | 2 | if ident_val.strip() == "": |
|
1416 | raise ValueError("Identifiers must not be empty: %s in file %s" |
||
1417 | % (ident_type, yaml_file)) |
||
1418 | 2 | if ident_type[0:3] == 'cce': |
|
1419 | 2 | if not is_cce_format_valid(ident_val): |
|
1420 | raise ValueError("CCE Identifier format must be valid: invalid format '%s' for CEE '%s'" |
||
1421 | " in file '%s'" % (ident_val, ident_type, yaml_file)) |
||
1422 | 2 | if not is_cce_value_valid("CCE-" + ident_val): |
|
1423 | raise ValueError("CCE Identifier value is not a valid checksum: invalid value '%s' for CEE '%s'" |
||
1424 | " in file '%s'" % (ident_val, ident_type, yaml_file)) |
||
1425 | |||
1426 | 2 | def validate_references(self, yaml_file): |
|
1427 | 2 | if self.references is None: |
|
1428 | raise ValueError("Empty references section in file %s" % yaml_file) |
||
1429 | |||
1430 | 2 | for ref_type, ref_val in self.references.items(): |
|
1431 | 2 | if not isinstance(ref_type, str) or not isinstance(ref_val, str): |
|
1432 | raise ValueError("References and values must be strings: %s in file %s" |
||
1433 | % (ref_type, yaml_file)) |
||
1434 | 2 | if ref_val.strip() == "": |
|
1435 | raise ValueError("References must not be empty: %s in file %s" |
||
1436 | % (ref_type, yaml_file)) |
||
1437 | |||
1438 | 2 | for ref_type, ref_val in self.references.items(): |
|
1439 | 2 | for ref in ref_val.split(","): |
|
1440 | 2 | if ref.strip() != ref: |
|
1441 | msg = ( |
||
1442 | "Comma-separated '{ref_type}' reference " |
||
1443 | "in {yaml_file} contains whitespace." |
||
1444 | .format(ref_type=ref_type, yaml_file=yaml_file)) |
||
1445 | raise ValueError(msg) |
||
1446 | |||
1447 | 2 | def validate_prodtype(self, yaml_file): |
|
1448 | 2 | for ptype in self.prodtype.split(","): |
|
1449 | 2 | if ptype.strip() != ptype: |
|
1450 | msg = ( |
||
1451 | "Comma-separated '{prodtype}' prodtype " |
||
1452 | "in {yaml_file} contains whitespace." |
||
1453 | .format(prodtype=self.prodtype, yaml_file=yaml_file)) |
||
1454 | raise ValueError(msg) |
||
1455 | |||
1456 | 2 | def to_xml_element(self): |
|
1457 | rule = ET.Element('Rule') |
||
1458 | rule.set('id', self.id_) |
||
1459 | if self.prodtype != "all": |
||
1460 | rule.set("prodtype", self.prodtype) |
||
1461 | rule.set('severity', self.severity) |
||
1462 | add_sub_element(rule, 'title', self.title) |
||
1463 | add_sub_element(rule, 'description', self.description) |
||
1464 | add_sub_element(rule, 'rationale', self.rationale) |
||
1465 | |||
1466 | main_ident = ET.Element('ident') |
||
1467 | for ident_type, ident_val in self.identifiers.items(): |
||
1468 | # This is not true if items were normalized |
||
1469 | if '@' in ident_type: |
||
1470 | # the ident is applicable only on some product |
||
1471 | # format : 'policy@product', eg. 'stigid@product' |
||
1472 | # for them, we create a separate <ref> element |
||
1473 | policy, product = ident_type.split('@') |
||
1474 | ident = ET.SubElement(rule, 'ident') |
||
1475 | ident.set(policy, ident_val) |
||
1476 | ident.set('prodtype', product) |
||
1477 | else: |
||
1478 | main_ident.set(ident_type, ident_val) |
||
1479 | |||
1480 | if main_ident.attrib: |
||
1481 | rule.append(main_ident) |
||
1482 | |||
1483 | main_ref = ET.Element('ref') |
||
1484 | for ref_type, ref_val in self.references.items(): |
||
1485 | # This is not true if items were normalized |
||
1486 | if '@' in ref_type: |
||
1487 | # the reference is applicable only on some product |
||
1488 | # format : 'policy@product', eg. 'stigid@product' |
||
1489 | # for them, we create a separate <ref> element |
||
1490 | policy, product = ref_type.split('@') |
||
1491 | ref = ET.SubElement(rule, 'ref') |
||
1492 | ref.set(policy, ref_val) |
||
1493 | ref.set('prodtype', product) |
||
1494 | else: |
||
1495 | main_ref.set(ref_type, ref_val) |
||
1496 | |||
1497 | if main_ref.attrib: |
||
1498 | rule.append(main_ref) |
||
1499 | |||
1500 | ocil_parent = rule |
||
1501 | check_parent = rule |
||
1502 | |||
1503 | if self.sce_metadata: |
||
1504 | # TODO: This is pretty much another hack, just like the previous OVAL |
||
1505 | # one. However, we avoided the external SCE content as I'm not sure it |
||
1506 | # is generally useful (unlike say, CVE checking with external OVAL) |
||
1507 | # |
||
1508 | # Additionally, we build the content (check subelement) here rather |
||
1509 | # than in xslt due to the nature of our SCE metadata. |
||
1510 | # |
||
1511 | # Finally, before we begin, we might have an element with both SCE |
||
1512 | # and OVAL. We have no way of knowing (right here) whether that is |
||
1513 | # the case (due to a variety of issues, most notably, that linking |
||
1514 | # hasn't yet occurred). So we must rely on the content author's |
||
1515 | # good will, by annotating SCE content with a complex-check tag |
||
1516 | # if necessary. |
||
1517 | |||
1518 | if 'complex-check' in self.sce_metadata: |
||
1519 | # Here we have an issue: XCCDF allows EITHER one or more check |
||
1520 | # elements OR a single complex-check. While we have an explicit |
||
1521 | # case handling the OVAL-and-SCE interaction, OCIL entries have |
||
1522 | # (historically) been alongside OVAL content and been in an |
||
1523 | # "OR" manner -- preferring OVAL to SCE. In order to accomplish |
||
1524 | # this, we thus need to add _yet another parent_ when OCIL data |
||
1525 | # is present, and add update ocil_parent accordingly. |
||
1526 | if self.ocil or self.ocil_clause: |
||
1527 | ocil_parent = ET.SubElement(ocil_parent, "complex-check") |
||
1528 | ocil_parent.set('operator', 'OR') |
||
1529 | |||
1530 | check_parent = ET.SubElement(ocil_parent, "complex-check") |
||
1531 | check_parent.set('operator', self.sce_metadata['complex-check']) |
||
1532 | |||
1533 | # Now, add the SCE check element to the tree. |
||
1534 | check = ET.SubElement(check_parent, "check") |
||
1535 | check.set("system", SCE_SYSTEM) |
||
1536 | |||
1537 | if 'check-import' in self.sce_metadata: |
||
1538 | if isinstance(self.sce_metadata['check-import'], str): |
||
1539 | self.sce_metadata['check-import'] = [self.sce_metadata['check-import']] |
||
1540 | for entry in self.sce_metadata['check-import']: |
||
1541 | check_import = ET.SubElement(check, 'check-import') |
||
1542 | check_import.set('import-name', entry) |
||
1543 | check_import.text = None |
||
1544 | |||
1545 | if 'check-export' in self.sce_metadata: |
||
1546 | if isinstance(self.sce_metadata['check-export'], str): |
||
1547 | self.sce_metadata['check-export'] = [self.sce_metadata['check-export']] |
||
1548 | for entry in self.sce_metadata['check-export']: |
||
1549 | export, value = entry.split('=') |
||
1550 | check_export = ET.SubElement(check, 'check-export') |
||
1551 | check_export.set('value-id', value) |
||
1552 | check_export.set('export-name', export) |
||
1553 | check_export.text = None |
||
1554 | |||
1555 | check_ref = ET.SubElement(check, "check-content-ref") |
||
1556 | href = self.sce_metadata['relative_path'] |
||
1557 | check_ref.set("href", href) |
||
1558 | |||
1559 | if self.oval_external_content: |
||
1560 | check = ET.SubElement(check_parent, 'check') |
||
1561 | check.set("system", "http://oval.mitre.org/XMLSchema/oval-definitions-5") |
||
1562 | external_content = ET.SubElement(check, "check-content-ref") |
||
1563 | external_content.set("href", self.oval_external_content) |
||
1564 | else: |
||
1565 | # TODO: This is pretty much a hack, oval ID will be the same as rule ID |
||
1566 | # and we don't want the developers to have to keep them in sync. |
||
1567 | # Therefore let's just add an OVAL ref of that ID. |
||
1568 | oval_ref = ET.SubElement(check_parent, "oval") |
||
1569 | oval_ref.set("id", self.id_) |
||
1570 | |||
1571 | if self.ocil or self.ocil_clause: |
||
1572 | ocil_check = ET.SubElement(check_parent, "check") |
||
1573 | ocil_check.set("system", ocil_cs) |
||
1574 | ocil_check_ref = ET.SubElement(ocil_check, "check-content-ref") |
||
1575 | ocil_check_ref.set("href", "ocil-unlinked.xml") |
||
1576 | ocil_check_ref.set("name", self.id_ + "_ocil") |
||
1577 | |||
1578 | add_warning_elements(rule, self.warnings) |
||
1579 | add_nondata_subelements(rule, "requires", "id", self.requires) |
||
1580 | add_nondata_subelements(rule, "conflicts", "id", self.conflicts) |
||
1581 | |||
1582 | for cpe_platform_name in self.cpe_platform_names: |
||
1583 | platform_el = ET.SubElement(rule, "platform") |
||
1584 | platform_el.set("idref", "#"+cpe_platform_name) |
||
1585 | |||
1586 | return rule |
||
1587 | |||
1588 | 2 | def to_file(self, file_name): |
|
1589 | root = self.to_xml_element() |
||
1590 | tree = ET.ElementTree(root) |
||
1591 | tree.write(file_name) |
||
1592 | |||
1593 | 2 | def to_ocil(self): |
|
1594 | if not self.ocil and not self.ocil_clause: |
||
1595 | raise ValueError("Rule {0} doesn't have OCIL".format(self.id_)) |
||
1596 | # Create <questionnaire> for the rule |
||
1597 | questionnaire = ET.Element("questionnaire", id=self.id_ + "_ocil") |
||
1598 | title = ET.SubElement(questionnaire, "title") |
||
1599 | title.text = self.title |
||
1600 | actions = ET.SubElement(questionnaire, "actions") |
||
1601 | test_action_ref = ET.SubElement(actions, "test_action_ref") |
||
1602 | test_action_ref.text = self.id_ + "_action" |
||
1603 | # Create <boolean_question_test_action> for the rule |
||
1604 | action = ET.Element( |
||
1605 | "boolean_question_test_action", |
||
1606 | id=self.id_ + "_action", |
||
1607 | question_ref=self.id_ + "_question") |
||
1608 | when_true = ET.SubElement(action, "when_true") |
||
1609 | result = ET.SubElement(when_true, "result") |
||
1610 | result.text = "PASS" |
||
1611 | when_true = ET.SubElement(action, "when_false") |
||
1612 | result = ET.SubElement(when_true, "result") |
||
1613 | result.text = "FAIL" |
||
1614 | # Create <boolean_question> |
||
1615 | boolean_question = ET.Element( |
||
1616 | "boolean_question", id=self.id_ + "_question") |
||
1617 | # TODO: The contents of <question_text> element used to be broken in |
||
1618 | # the legacy XSLT implementation. The following code contains hacks |
||
1619 | # to get the same results as in the legacy XSLT implementation. |
||
1620 | # This enabled us a smooth transition to new OCIL generator |
||
1621 | # without a need to mass-edit rule YAML files. |
||
1622 | # We need to solve: |
||
1623 | # TODO: using variables (aka XCCDF Values) in OCIL content |
||
1624 | # TODO: using HTML formating tags eg. <pre> in OCIL content |
||
1625 | # |
||
1626 | # The "ocil" key in compiled rules contains HTML and XML elements |
||
1627 | # but OCIL question texts shouldn't contain HTML or XML elements, |
||
1628 | # therefore removing them. |
||
1629 | if self.ocil is not None: |
||
1630 | ocil_without_tags = re.sub(r"</?[^>]+>", "", self.ocil) |
||
1631 | else: |
||
1632 | ocil_without_tags = "" |
||
1633 | # The "ocil" key in compiled rules contains XML entities which would |
||
1634 | # be escaped by ET.Subelement() so we need to use add_sub_element() |
||
1635 | # instead because we don't want to escape them. |
||
1636 | question_text = add_sub_element( |
||
1637 | boolean_question, "question_text", ocil_without_tags) |
||
1638 | # The "ocil_clause" key in compiled rules also contains HTML and XML |
||
1639 | # elements but unlike the "ocil" we want to escape the '<' and '>' |
||
1640 | # characters. |
||
1641 | # The empty ocil_clause causing broken question is in line with the |
||
1642 | # legacy XSLT implementation. |
||
1643 | ocil_clause = self.ocil_clause if self.ocil_clause else "" |
||
1644 | question_text.text = ( |
||
1645 | "{0}\n Is it the case that {1}?\n ".format( |
||
1646 | question_text.text if question_text.text is not None else "", |
||
1647 | ocil_clause)) |
||
1648 | return (questionnaire, action, boolean_question) |
||
1649 | |||
1650 | 2 | def __hash__(self): |
|
1651 | """ Controls are meant to be unique, so using the |
||
1652 | ID should suffice""" |
||
1653 | return hash(self.id_) |
||
1654 | |||
1655 | 2 | def __eq__(self, other): |
|
1656 | return isinstance(other, self.__class__) and self.id_ == other.id_ |
||
1657 | |||
1658 | 2 | def __ne__(self, other): |
|
1659 | return not self != other |
||
1660 | |||
1661 | 2 | def __lt__(self, other): |
|
1662 | return self.id_ < other.id_ |
||
1663 | |||
1664 | 2 | def __str__(self): |
|
1665 | return self.id_ |
||
1666 | |||
1667 | |||
1668 | 2 | class DirectoryLoader(object): |
|
1669 | 2 | def __init__(self, profiles_dir, env_yaml): |
|
1670 | self.benchmark_file = None |
||
1671 | self.group_file = None |
||
1672 | self.loaded_group = None |
||
1673 | self.rule_files = [] |
||
1674 | self.value_files = [] |
||
1675 | self.subdirectories = [] |
||
1676 | |||
1677 | self.all_values = dict() |
||
1678 | self.all_rules = dict() |
||
1679 | self.all_groups = dict() |
||
1680 | |||
1681 | self.profiles_dir = profiles_dir |
||
1682 | self.env_yaml = env_yaml |
||
1683 | self.product = env_yaml["product"] |
||
1684 | |||
1685 | self.parent_group = None |
||
1686 | |||
1687 | 2 | def _collect_items_to_load(self, guide_directory): |
|
1688 | for dir_item in sorted(os.listdir(guide_directory)): |
||
1689 | dir_item_path = os.path.join(guide_directory, dir_item) |
||
1690 | _, extension = os.path.splitext(dir_item) |
||
1691 | |||
1692 | if extension == '.var': |
||
1693 | self.value_files.append(dir_item_path) |
||
1694 | elif dir_item == "benchmark.yml": |
||
1695 | if self.benchmark_file: |
||
1696 | raise ValueError("Multiple benchmarks in one directory") |
||
1697 | self.benchmark_file = dir_item_path |
||
1698 | elif dir_item == "group.yml": |
||
1699 | if self.group_file: |
||
1700 | raise ValueError("Multiple groups in one directory") |
||
1701 | self.group_file = dir_item_path |
||
1702 | elif extension == '.rule': |
||
1703 | self.rule_files.append(dir_item_path) |
||
1704 | elif is_rule_dir(dir_item_path): |
||
1705 | self.rule_files.append(get_rule_dir_yaml(dir_item_path)) |
||
1706 | elif dir_item != "tests": |
||
1707 | if os.path.isdir(dir_item_path): |
||
1708 | self.subdirectories.append(dir_item_path) |
||
1709 | else: |
||
1710 | sys.stderr.write( |
||
1711 | "Encountered file '%s' while recursing, extension '%s' " |
||
1712 | "is unknown. Skipping..\n" |
||
1713 | % (dir_item, extension) |
||
1714 | ) |
||
1715 | |||
1716 | 2 | def load_benchmark_or_group(self, guide_directory): |
|
1717 | """ |
||
1718 | Loads a given benchmark or group from the specified benchmark_file or |
||
1719 | group_file, in the context of guide_directory, profiles_dir and env_yaml. |
||
1720 | |||
1721 | Returns the loaded group or benchmark. |
||
1722 | """ |
||
1723 | group = None |
||
1724 | if self.group_file and self.benchmark_file: |
||
1725 | raise ValueError("A .benchmark file and a .group file were found in " |
||
1726 | "the same directory '%s'" % (guide_directory)) |
||
1727 | |||
1728 | # we treat benchmark as a special form of group in the following code |
||
1729 | if self.benchmark_file: |
||
1730 | group = Benchmark.from_yaml( |
||
1731 | self.benchmark_file, self.env_yaml, 'product-name' |
||
1732 | ) |
||
1733 | if self.profiles_dir: |
||
1734 | group.add_profiles_from_dir(self.profiles_dir, self.env_yaml) |
||
1735 | |||
1736 | if self.group_file: |
||
1737 | group = Group.from_yaml(self.group_file, self.env_yaml) |
||
1738 | self.all_groups[group.id_] = group |
||
1739 | |||
1740 | return group |
||
1741 | |||
1742 | 2 | def _load_group_process_and_recurse(self, guide_directory): |
|
1743 | self.loaded_group = self.load_benchmark_or_group(guide_directory) |
||
1744 | |||
1745 | if self.loaded_group: |
||
1746 | |||
1747 | if self.parent_group: |
||
1748 | self.parent_group.add_group(self.loaded_group, env_yaml=self.env_yaml) |
||
1749 | |||
1750 | self._process_values() |
||
1751 | self._recurse_into_subdirs() |
||
1752 | self._process_rules() |
||
1753 | |||
1754 | 2 | def process_directory_tree(self, start_dir, extra_group_dirs=None): |
|
1755 | self._collect_items_to_load(start_dir) |
||
1756 | if extra_group_dirs: |
||
1757 | self.subdirectories += extra_group_dirs |
||
1758 | self._load_group_process_and_recurse(start_dir) |
||
1759 | |||
1760 | 2 | def process_directory_trees(self, directories): |
|
1761 | start_dir = directories[0] |
||
1762 | extra_group_dirs = directories[1:] |
||
1763 | return self.process_directory_tree(start_dir, extra_group_dirs) |
||
1764 | |||
1765 | 2 | def _recurse_into_subdirs(self): |
|
1766 | for subdir in self.subdirectories: |
||
1767 | loader = self._get_new_loader() |
||
1768 | loader.parent_group = self.loaded_group |
||
1769 | loader.process_directory_tree(subdir) |
||
1770 | self.all_values.update(loader.all_values) |
||
1771 | self.all_rules.update(loader.all_rules) |
||
1772 | self.all_groups.update(loader.all_groups) |
||
1773 | |||
1774 | 2 | def _get_new_loader(self): |
|
1775 | raise NotImplementedError() |
||
1776 | |||
1777 | 2 | def _process_values(self): |
|
1778 | raise NotImplementedError() |
||
1779 | |||
1780 | 2 | def _process_rules(self): |
|
1781 | raise NotImplementedError() |
||
1782 | |||
1783 | 2 | def save_all_entities(self, base_dir): |
|
1784 | destdir = os.path.join(base_dir, "rules") |
||
1785 | mkdir_p(destdir) |
||
1786 | if self.all_rules: |
||
1787 | self.save_entities(self.all_rules.values(), destdir) |
||
1788 | |||
1789 | destdir = os.path.join(base_dir, "groups") |
||
1790 | mkdir_p(destdir) |
||
1791 | if self.all_groups: |
||
1792 | self.save_entities(self.all_groups.values(), destdir) |
||
1793 | |||
1794 | destdir = os.path.join(base_dir, "values") |
||
1795 | mkdir_p(destdir) |
||
1796 | if self.all_values: |
||
1797 | self.save_entities(self.all_values.values(), destdir) |
||
1798 | |||
1799 | 2 | def save_entities(self, entities, destdir): |
|
1800 | if not entities: |
||
1801 | return |
||
1802 | for entity in entities: |
||
1803 | basename = entity.id_ + ".yml" |
||
1804 | dest_filename = os.path.join(destdir, basename) |
||
1805 | entity.dump_yaml(dest_filename) |
||
1806 | |||
1807 | |||
1808 | 2 | class BuildLoader(DirectoryLoader): |
|
1809 | 2 | def __init__(self, profiles_dir, env_yaml, |
|
1810 | sce_metadata_path=None): |
||
1811 | super(BuildLoader, self).__init__(profiles_dir, env_yaml) |
||
1812 | |||
1813 | self.sce_metadata = None |
||
1814 | if sce_metadata_path and os.path.getsize(sce_metadata_path): |
||
1815 | self.sce_metadata = json.load(open(sce_metadata_path, 'r')) |
||
1816 | |||
1817 | 2 | def _process_values(self): |
|
1818 | for value_yaml in self.value_files: |
||
1819 | value = Value.from_yaml(value_yaml, self.env_yaml) |
||
1820 | self.all_values[value.id_] = value |
||
1821 | self.loaded_group.add_value(value) |
||
1822 | |||
1823 | 2 | def _process_rules(self): |
|
1824 | for rule_yaml in self.rule_files: |
||
1825 | try: |
||
1826 | rule = Rule.from_yaml(rule_yaml, self.env_yaml, self.sce_metadata) |
||
1827 | except DocumentationNotComplete: |
||
1828 | # Happens on non-debug build when a rule is "documentation-incomplete" |
||
1829 | continue |
||
1830 | prodtypes = parse_prodtype(rule.prodtype) |
||
1831 | if "all" not in prodtypes and self.product not in prodtypes: |
||
1832 | continue |
||
1833 | self.all_rules[rule.id_] = rule |
||
1834 | self.loaded_group.add_rule(rule, env_yaml=self.env_yaml) |
||
1835 | |||
1836 | if self.loaded_group.platforms: |
||
1837 | rule.inherited_platforms += self.loaded_group.platforms |
||
1838 | |||
1839 | rule.normalize(self.env_yaml["product"]) |
||
1840 | |||
1841 | 2 | def _get_new_loader(self): |
|
1842 | loader = BuildLoader( |
||
1843 | self.profiles_dir, self.env_yaml) |
||
1844 | # Do it this way so we only have to parse the SCE metadata once. |
||
1845 | loader.sce_metadata = self.sce_metadata |
||
1846 | return loader |
||
1847 | |||
1848 | 2 | def export_group_to_file(self, filename): |
|
1849 | return self.loaded_group.to_file(filename) |
||
1850 | |||
1851 | |||
1852 | 2 | class LinearLoader(object): |
|
1853 | 2 | def __init__(self, env_yaml, resolved_path): |
|
1854 | self.resolved_rules_dir = os.path.join(resolved_path, "rules") |
||
1855 | self.rules = dict() |
||
1856 | |||
1857 | self.resolved_profiles_dir = os.path.join(resolved_path, "profiles") |
||
1858 | self.profiles = dict() |
||
1859 | |||
1860 | self.resolved_groups_dir = os.path.join(resolved_path, "groups") |
||
1861 | self.groups = dict() |
||
1862 | |||
1863 | self.resolved_values_dir = os.path.join(resolved_path, "values") |
||
1864 | self.values = dict() |
||
1865 | |||
1866 | self.benchmark = None |
||
1867 | self.env_yaml = env_yaml |
||
1868 | |||
1869 | 2 | def find_first_groups_ids(self, start_dir): |
|
1870 | group_files = glob.glob(os.path.join(start_dir, "*", "group.yml")) |
||
1871 | group_ids = [fname.split(os.path.sep)[-2] for fname in group_files] |
||
1872 | return group_ids |
||
1873 | |||
1874 | 2 | def load_entities_by_id(self, filenames, destination, cls): |
|
1875 | for fname in filenames: |
||
1876 | entity = cls.from_yaml(fname, self.env_yaml) |
||
1877 | destination[entity.id_] = entity |
||
1878 | |||
1879 | 2 | def load_benchmark(self, directory): |
|
1880 | self.benchmark = Benchmark.from_yaml( |
||
1881 | os.path.join(directory, "benchmark.yml"), self.env_yaml, "product-name") |
||
1882 | |||
1883 | self.benchmark.add_profiles_from_dir(self.resolved_profiles_dir, self.env_yaml) |
||
1884 | |||
1885 | benchmark_first_groups = self.find_first_groups_ids(directory) |
||
1886 | for gid in benchmark_first_groups: |
||
1887 | self.benchmark.add_group(self.groups[gid], self.env_yaml) |
||
1888 | |||
1889 | 2 | def load_compiled_content(self): |
|
1890 | filenames = glob.glob(os.path.join(self.resolved_rules_dir, "*.yml")) |
||
1891 | self.load_entities_by_id(filenames, self.rules, Rule) |
||
1892 | |||
1893 | filenames = glob.glob(os.path.join(self.resolved_groups_dir, "*.yml")) |
||
1894 | self.load_entities_by_id(filenames, self.groups, Group) |
||
1895 | |||
1896 | filenames = glob.glob(os.path.join(self.resolved_profiles_dir, "*.yml")) |
||
1897 | self.load_entities_by_id(filenames, self.profiles, Profile) |
||
1898 | |||
1899 | filenames = glob.glob(os.path.join(self.resolved_values_dir, "*.yml")) |
||
1900 | self.load_entities_by_id(filenames, self.values, Value) |
||
1901 | |||
1902 | for g in self.groups.values(): |
||
1903 | g.load_entities(self.rules, self.values, self.groups) |
||
1904 | |||
1905 | 2 | def export_benchmark_to_file(self, filename): |
|
1906 | return self.benchmark.to_file(filename) |
||
1907 | |||
1908 | 2 | def export_ocil_to_file(self, filename): |
|
1909 | root = ET.Element('ocil') |
||
1910 | root.set('xmlns:xsi', xsi_namespace) |
||
1911 | root.set("xmlns", ocil_namespace) |
||
1912 | root.set("xmlns:xhtml", xhtml_namespace) |
||
1913 | tree = ET.ElementTree(root) |
||
1914 | generator = ET.SubElement(root, "generator") |
||
1915 | product_name = ET.SubElement(generator, "product_name") |
||
1916 | product_name.text = "build_shorthand.py from SCAP Security Guide" |
||
1917 | product_version = ET.SubElement(generator, "product_version") |
||
1918 | product_version.text = "ssg: " + self.env_yaml["ssg_version_str"] |
||
1919 | schema_version = ET.SubElement(generator, "schema_version") |
||
1920 | schema_version.text = "2.0" |
||
1921 | timestamp_el = ET.SubElement(generator, "timestamp") |
||
1922 | timestamp_el.text = timestamp |
||
1923 | questionnaires = ET.SubElement(root, "questionnaires") |
||
1924 | test_actions = ET.SubElement(root, "test_actions") |
||
1925 | questions = ET.SubElement(root, "questions") |
||
1926 | for rule in self.rules.values(): |
||
1927 | if not rule.ocil and not rule.ocil_clause: |
||
1928 | continue |
||
1929 | questionnaire, action, boolean_question = rule.to_ocil() |
||
1930 | questionnaires.append(questionnaire) |
||
1931 | test_actions.append(action) |
||
1932 | questions.append(boolean_question) |
||
1933 | tree.write(filename) |
||
1934 |