Total Complexity | 328 |
Total Lines | 1589 |
Duplicated Lines | 1.7 % |
Coverage | 33.98% |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like ssg.build_yaml often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | 2 | from __future__ import absolute_import |
|
2 | 2 | from __future__ import print_function |
|
3 | |||
4 | 2 | import os |
|
5 | 2 | import os.path |
|
6 | 2 | from collections import defaultdict |
|
7 | 2 | from copy import deepcopy |
|
8 | 2 | import datetime |
|
9 | 2 | import re |
|
10 | 2 | import sys |
|
11 | 2 | from xml.sax.saxutils import escape |
|
12 | |||
13 | 2 | import yaml |
|
14 | |||
15 | 2 | from .build_cpe import CPEDoesNotExist |
|
16 | 2 | from .constants import XCCDF_REFINABLE_PROPERTIES |
|
17 | 2 | from .rules import get_rule_dir_id, get_rule_dir_yaml, is_rule_dir |
|
18 | 2 | from .rule_yaml import parse_prodtype |
|
19 | 2 | from .controls import Control |
|
20 | |||
21 | 2 | from .checks import is_cce_format_valid, is_cce_value_valid |
|
22 | 2 | from .yaml import DocumentationNotComplete, open_and_expand, open_and_macro_expand |
|
23 | 2 | from .utils import required_key, mkdir_p |
|
24 | |||
25 | 2 | from .xml import ElementTree as ET |
|
26 | 2 | from .shims import unicode_func |
|
27 | |||
28 | |||
29 | 2 | def add_sub_element(parent, tag, data): |
|
30 | """ |
||
31 | Creates a new child element under parent with tag tag, and sets |
||
32 | data as the content under the tag. In particular, data is a string |
||
33 | to be parsed as an XML tree, allowing sub-elements of children to be |
||
34 | added. |
||
35 | |||
36 | If data should not be parsed as an XML tree, either escape the contents |
||
37 | before passing into this function, or use ElementTree.SubElement(). |
||
38 | |||
39 | Returns the newly created subelement of type tag. |
||
40 | """ |
||
41 | # This is used because our YAML data contain XML and XHTML elements |
||
42 | # ET.SubElement() escapes the < > characters by < and > |
||
43 | # and therefore it does not add child elements |
||
44 | # we need to do a hack instead |
||
45 | # TODO: Remove this function after we move to Markdown everywhere in SSG |
||
46 | ustr = unicode_func("<{0}>{1}</{0}>").format(tag, data) |
||
47 | |||
48 | try: |
||
49 | element = ET.fromstring(ustr.encode("utf-8")) |
||
50 | except Exception: |
||
51 | msg = ("Error adding subelement to an element '{0}' from string: '{1}'" |
||
52 | .format(parent.tag, ustr)) |
||
53 | raise RuntimeError(msg) |
||
54 | |||
55 | parent.append(element) |
||
56 | return element |
||
57 | |||
58 | |||
59 | 2 | def reorder_according_to_ordering(unordered, ordering, regex=None): |
|
60 | 2 | ordered = [] |
|
61 | 2 | if regex is None: |
|
62 | 2 | regex = "|".join(["({0})".format(item) for item in ordering]) |
|
63 | 2 | regex = re.compile(regex) |
|
64 | |||
65 | 2 | items_to_order = list(filter(regex.match, unordered)) |
|
66 | 2 | unordered = set(unordered) |
|
67 | |||
68 | 2 | for priority_type in ordering: |
|
69 | 2 | for item in items_to_order: |
|
70 | 2 | if priority_type in item and item in unordered: |
|
71 | 2 | ordered.append(item) |
|
72 | 2 | unordered.remove(item) |
|
73 | 2 | ordered.extend(list(unordered)) |
|
74 | 2 | return ordered |
|
75 | |||
76 | |||
77 | 2 | def add_warning_elements(element, warnings): |
|
78 | # The use of [{dict}, {dict}] in warnings is to handle the following |
||
79 | # scenario where multiple warnings have the same category which is |
||
80 | # valid in SCAP and our content: |
||
81 | # |
||
82 | # warnings: |
||
83 | # - general: Some general warning |
||
84 | # - general: Some other general warning |
||
85 | # - general: |- |
||
86 | # Some really long multiline general warning |
||
87 | # |
||
88 | # Each of the {dict} should have only one key/value pair. |
||
89 | for warning_dict in warnings: |
||
90 | warning = add_sub_element(element, "warning", list(warning_dict.values())[0]) |
||
91 | warning.set("category", list(warning_dict.keys())[0]) |
||
92 | |||
93 | |||
94 | 2 | def add_nondata_subelements(element, subelement, attribute, attr_data): |
|
95 | """Add multiple iterations of a sublement that contains an attribute but no data |
||
96 | For example, <requires id="my_required_id"/>""" |
||
97 | for data in attr_data: |
||
98 | req = ET.SubElement(element, subelement) |
||
99 | req.set(attribute, data) |
||
100 | |||
101 | |||
102 | 2 | class Profile(object): |
|
103 | """Represents XCCDF profile |
||
104 | """ |
||
105 | |||
106 | 2 | def __init__(self, id_): |
|
107 | 2 | self.id_ = id_ |
|
108 | 2 | self.title = "" |
|
109 | 2 | self.description = "" |
|
110 | 2 | self.extends = None |
|
111 | 2 | self.selected = [] |
|
112 | 2 | self.unselected = [] |
|
113 | 2 | self.variables = dict() |
|
114 | 2 | self.refine_rules = defaultdict(list) |
|
115 | 2 | self.metadata = None |
|
116 | 2 | self.reference = None |
|
117 | # self.platforms is used further in the build system |
||
118 | # self.platform is merged into self.platforms |
||
119 | # it is here for backward compatibility |
||
120 | 2 | self.platforms = set() |
|
121 | 2 | self.cpe_names = set() |
|
122 | 2 | self.platform = None |
|
123 | |||
124 | |||
125 | 2 | def read_yaml_contents(self, yaml_contents): |
|
126 | 2 | self.title = required_key(yaml_contents, "title") |
|
127 | 2 | del yaml_contents["title"] |
|
128 | 2 | self.description = required_key(yaml_contents, "description") |
|
129 | 2 | del yaml_contents["description"] |
|
130 | 2 | self.extends = yaml_contents.pop("extends", None) |
|
131 | 2 | selection_entries = required_key(yaml_contents, "selections") |
|
132 | 2 | if selection_entries: |
|
133 | 2 | self._parse_selections(selection_entries) |
|
134 | 2 | del yaml_contents["selections"] |
|
135 | 2 | self.platforms = yaml_contents.pop("platforms", set()) |
|
136 | 2 | self.platform = yaml_contents.pop("platform", None) |
|
137 | |||
138 | 2 | @classmethod |
|
139 | 2 | def from_yaml(cls, yaml_file, env_yaml=None): |
|
140 | 2 | yaml_contents = open_and_expand(yaml_file, env_yaml) |
|
141 | 2 | if yaml_contents is None: |
|
142 | return None |
||
143 | |||
144 | 2 | basename, _ = os.path.splitext(os.path.basename(yaml_file)) |
|
145 | |||
146 | 2 | profile = cls(basename) |
|
147 | 2 | profile.read_yaml_contents(yaml_contents) |
|
148 | |||
149 | 2 | profile.reference = yaml_contents.pop("reference", None) |
|
150 | # ensure that content of profile.platform is in profile.platforms as |
||
151 | # well |
||
152 | 2 | if profile.platform is not None: |
|
153 | profile.platforms.add(profile.platform) |
||
154 | |||
155 | 2 | if env_yaml: |
|
156 | for platform in profile.platforms: |
||
157 | try: |
||
158 | profile.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform)) |
||
159 | except CPEDoesNotExist: |
||
160 | print("Unsupported platform '%s' in profile '%s'." % (platform, profile.id_)) |
||
161 | raise |
||
162 | |||
163 | # At the moment, metadata is not used to build content |
||
164 | 2 | if "metadata" in yaml_contents: |
|
165 | del yaml_contents["metadata"] |
||
166 | |||
167 | 2 | if yaml_contents: |
|
168 | raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s" |
||
169 | % (yaml_file, yaml_contents)) |
||
170 | |||
171 | 2 | return profile |
|
172 | |||
173 | 2 | def dump_yaml(self, file_name, documentation_complete=True): |
|
174 | to_dump = {} |
||
175 | to_dump["documentation_complete"] = documentation_complete |
||
176 | to_dump["title"] = self.title |
||
177 | to_dump["description"] = self.description |
||
178 | to_dump["reference"] = self.reference |
||
179 | if self.metadata is not None: |
||
180 | to_dump["metadata"] = self.metadata |
||
181 | |||
182 | if self.extends is not None: |
||
183 | to_dump["extends"] = self.extends |
||
184 | |||
185 | if self.platforms: |
||
186 | to_dump["platforms"] = self.platforms |
||
187 | |||
188 | selections = [] |
||
189 | for item in self.selected: |
||
190 | selections.append(item) |
||
191 | for item in self.unselected: |
||
192 | selections.append("!"+item) |
||
193 | for varname in self.variables.keys(): |
||
194 | selections.append(varname+"="+self.variables.get(varname)) |
||
195 | for rule, refinements in self.refine_rules.items(): |
||
196 | for prop, val in refinements: |
||
197 | selections.append("{rule}.{property}={value}" |
||
198 | .format(rule=rule, property=prop, value=val)) |
||
199 | to_dump["selections"] = selections |
||
200 | with open(file_name, "w+") as f: |
||
201 | yaml.dump(to_dump, f, indent=4) |
||
202 | |||
203 | 2 | def _parse_selections(self, entries): |
|
204 | 2 | for item in entries: |
|
205 | 2 | self.apply_selection(item) |
|
206 | |||
207 | 2 | def apply_selection(self, item): |
|
208 | 2 | if "." in item: |
|
209 | rule, refinement = item.split(".", 1) |
||
210 | property_, value = refinement.split("=", 1) |
||
211 | if property_ not in XCCDF_REFINABLE_PROPERTIES: |
||
212 | msg = ("Property '{property_}' cannot be refined. " |
||
213 | "Rule properties that can be refined are {refinables}. " |
||
214 | "Fix refinement '{rule_id}.{property_}={value}' in profile '{profile}'." |
||
215 | .format(property_=property_, refinables=XCCDF_REFINABLE_PROPERTIES, |
||
216 | rule_id=rule, value=value, profile=self.id_) |
||
217 | ) |
||
218 | raise ValueError(msg) |
||
219 | self.refine_rules[rule].append((property_, value)) |
||
220 | 2 | elif "=" in item: |
|
221 | 2 | varname, value = item.split("=", 1) |
|
222 | 2 | self.variables[varname] = value |
|
223 | 2 | elif item.startswith("!"): |
|
224 | self.unselected.append(item[1:]) |
||
225 | else: |
||
226 | 2 | self.selected.append(item) |
|
227 | |||
228 | 2 | def to_xml_element(self): |
|
229 | element = ET.Element('Profile') |
||
230 | element.set("id", self.id_) |
||
231 | if self.extends: |
||
232 | element.set("extends", self.extends) |
||
233 | title = add_sub_element(element, "title", self.title) |
||
234 | title.set("override", "true") |
||
235 | desc = add_sub_element(element, "description", self.description) |
||
236 | desc.set("override", "true") |
||
237 | |||
238 | if self.reference: |
||
239 | add_sub_element(element, "reference", escape(self.reference)) |
||
240 | |||
241 | for cpe_name in self.cpe_names: |
||
242 | plat = ET.SubElement(element, "platform") |
||
243 | plat.set("idref", cpe_name) |
||
244 | |||
245 | for selection in self.selected: |
||
246 | select = ET.Element("select") |
||
247 | select.set("idref", selection) |
||
248 | select.set("selected", "true") |
||
249 | element.append(select) |
||
250 | |||
251 | for selection in self.unselected: |
||
252 | unselect = ET.Element("select") |
||
253 | unselect.set("idref", selection) |
||
254 | unselect.set("selected", "false") |
||
255 | element.append(unselect) |
||
256 | |||
257 | for value_id, selector in self.variables.items(): |
||
258 | refine_value = ET.Element("refine-value") |
||
259 | refine_value.set("idref", value_id) |
||
260 | refine_value.set("selector", selector) |
||
261 | element.append(refine_value) |
||
262 | |||
263 | for refined_rule, refinement_list in self.refine_rules.items(): |
||
264 | refine_rule = ET.Element("refine-rule") |
||
265 | refine_rule.set("idref", refined_rule) |
||
266 | for refinement in refinement_list: |
||
267 | refine_rule.set(refinement[0], refinement[1]) |
||
268 | element.append(refine_rule) |
||
269 | |||
270 | return element |
||
271 | |||
272 | 2 | def get_rule_selectors(self): |
|
273 | 2 | return list(self.selected + self.unselected) |
|
274 | |||
275 | 2 | def get_variable_selectors(self): |
|
276 | 2 | return self.variables |
|
277 | |||
278 | 2 | def validate_refine_rules(self, rules): |
|
279 | existing_rule_ids = [r.id_ for r in rules] |
||
280 | for refine_rule, refinement_list in self.refine_rules.items(): |
||
281 | # Take first refinement to ilustrate where the error is |
||
282 | # all refinements in list are invalid, so it doesn't really matter |
||
283 | a_refinement = refinement_list[0] |
||
284 | |||
285 | if refine_rule not in existing_rule_ids: |
||
286 | msg = ( |
||
287 | "You are trying to refine a rule that doesn't exist. " |
||
288 | "Rule '{rule_id}' was not found in the benchmark. " |
||
289 | "Please check all rule refinements for rule: '{rule_id}', for example: " |
||
290 | "- {rule_id}.{property_}={value}' in profile {profile_id}." |
||
291 | .format(rule_id=refine_rule, profile_id=self.id_, |
||
292 | property_=a_refinement[0], value=a_refinement[1]) |
||
293 | ) |
||
294 | raise ValueError(msg) |
||
295 | |||
296 | if refine_rule not in self.get_rule_selectors(): |
||
297 | msg = ("- {rule_id}.{property_}={value}' in profile '{profile_id}' is refining " |
||
298 | "a rule that is not selected by it. The refinement will not have any " |
||
299 | "noticeable effect. Either select the rule or remove the rule refinement." |
||
300 | .format(rule_id=refine_rule, property_=a_refinement[0], |
||
301 | value=a_refinement[1], profile_id=self.id_) |
||
302 | ) |
||
303 | raise ValueError(msg) |
||
304 | |||
305 | 2 | def validate_variables(self, variables): |
|
306 | variables_by_id = dict() |
||
307 | for var in variables: |
||
308 | variables_by_id[var.id_] = var |
||
309 | |||
310 | for var_id, our_val in self.variables.items(): |
||
311 | if var_id not in variables_by_id: |
||
312 | all_vars_list = [" - %s" % v for v in variables_by_id.keys()] |
||
313 | msg = ( |
||
314 | "Value '{var_id}' in profile '{profile_name}' is not known. " |
||
315 | "We know only variables:\n{var_names}" |
||
316 | .format( |
||
317 | var_id=var_id, profile_name=self.id_, |
||
318 | var_names="\n".join(sorted(all_vars_list))) |
||
319 | ) |
||
320 | raise ValueError(msg) |
||
321 | |||
322 | allowed_selectors = [str(s) for s in variables_by_id[var_id].options.keys()] |
||
323 | if our_val not in allowed_selectors: |
||
324 | msg = ( |
||
325 | "Value '{var_id}' in profile '{profile_name}' " |
||
326 | "uses the selector '{our_val}'. " |
||
327 | "This is not possible, as only selectors {all_selectors} are available. " |
||
328 | "Either change the selector used in the profile, or " |
||
329 | "add the selector-value pair to the variable definition." |
||
330 | .format( |
||
331 | var_id=var_id, profile_name=self.id_, our_val=our_val, |
||
332 | all_selectors=allowed_selectors, |
||
333 | ) |
||
334 | ) |
||
335 | raise ValueError(msg) |
||
336 | |||
337 | 2 | def validate_rules(self, rules, groups): |
|
338 | existing_rule_ids = [r.id_ for r in rules] |
||
339 | rule_selectors = self.get_rule_selectors() |
||
340 | for id_ in rule_selectors: |
||
341 | if id_ in groups: |
||
342 | msg = ( |
||
343 | "You have selected a group '{group_id}' instead of a " |
||
344 | "rule. Groups have no effect in the profile and are not " |
||
345 | "allowed to be selected. Please remove '{group_id}' " |
||
346 | "from profile '{profile_id}' before proceeding." |
||
347 | .format(group_id=id_, profile_id=self.id_) |
||
348 | ) |
||
349 | raise ValueError(msg) |
||
350 | if id_ not in existing_rule_ids: |
||
351 | msg = ( |
||
352 | "Rule '{rule_id}' was not found in the benchmark. Please " |
||
353 | "remove rule '{rule_id}' from profile '{profile_id}' " |
||
354 | "before proceeding." |
||
355 | .format(rule_id=id_, profile_id=self.id_) |
||
356 | ) |
||
357 | raise ValueError(msg) |
||
358 | |||
359 | 2 | def __sub__(self, other): |
|
360 | profile = Profile(self.id_) |
||
361 | profile.title = self.title |
||
362 | profile.description = self.description |
||
363 | profile.extends = self.extends |
||
364 | profile.platforms = self.platforms |
||
365 | profile.platform = self.platform |
||
366 | profile.selected = list(set(self.selected) - set(other.selected)) |
||
367 | profile.selected.sort() |
||
368 | profile.unselected = list(set(self.unselected) - set(other.unselected)) |
||
369 | profile.variables = dict ((k, v) for (k, v) in self.variables.items() |
||
370 | if k not in other.variables or v != other.variables[k]) |
||
371 | return profile |
||
372 | |||
373 | |||
374 | 2 | class ResolvableProfile(Profile): |
|
375 | 2 | def __init__(self, * args, ** kwargs): |
|
376 | super(ResolvableProfile, self).__init__(* args, ** kwargs) |
||
377 | self.resolved = False |
||
378 | self.resolved_selections = set() |
||
379 | |||
380 | 2 | def _controls_ids_to_controls(self, controls_manager, policy_id, control_id_list): |
|
381 | items = [controls_manager.get_control(policy_id, cid) for cid in control_id_list] |
||
382 | return items |
||
383 | |||
384 | 2 | def _merge_control(self, control): |
|
385 | self.selected.extend(control.rules) |
||
386 | for varname, value in control.variables.items(): |
||
387 | if varname not in self.variables: |
||
388 | self.variables[varname] = value |
||
389 | |||
390 | 2 | def resolve_controls(self, controls_manager): |
|
391 | pass |
||
392 | |||
393 | 2 | def extend_by(self, extended_profile): |
|
394 | extended_selects = set(extended_profile.selected) |
||
395 | self.resolved_selections.update(extended_selects) |
||
396 | |||
397 | updated_variables = dict(extended_profile.variables) |
||
398 | updated_variables.update(self.variables) |
||
399 | self.variables = updated_variables |
||
400 | |||
401 | extended_refinements = deepcopy(extended_profile.refine_rules) |
||
402 | updated_refinements = self._subtract_refinements(extended_refinements) |
||
403 | updated_refinements.update(self.refine_rules) |
||
404 | self.refine_rules = updated_refinements |
||
405 | |||
406 | 2 | def resolve(self, all_profiles, controls_manager=None): |
|
407 | if self.resolved: |
||
408 | return |
||
409 | |||
410 | self.resolve_controls(controls_manager) |
||
411 | |||
412 | self.resolved_selections = set(self.selected) |
||
413 | |||
414 | if self.extends: |
||
415 | if self.extends not in all_profiles: |
||
416 | msg = ( |
||
417 | "Profile {name} extends profile {extended}, but " |
||
418 | "only profiles {known_profiles} are available for resolution." |
||
419 | .format(name=self.id_, extended=self.extends, |
||
420 | known_profiles=list(all_profiles.keys()))) |
||
421 | raise RuntimeError(msg) |
||
422 | extended_profile = all_profiles[self.extends] |
||
423 | extended_profile.resolve(all_profiles, controls_manager) |
||
424 | |||
425 | self.extend_by(extended_profile) |
||
426 | |||
427 | for uns in self.unselected: |
||
428 | self.resolved_selections.discard(uns) |
||
429 | |||
430 | self.unselected = [] |
||
431 | self.extends = None |
||
432 | |||
433 | self.selected = sorted(self.resolved_selections) |
||
434 | |||
435 | self.resolved = True |
||
436 | |||
437 | 2 | def _subtract_refinements(self, extended_refinements): |
|
438 | """ |
||
439 | Given a dict of rule refinements from the extended profile, |
||
440 | "undo" every refinement prefixed with '!' in this profile. |
||
441 | """ |
||
442 | for rule, refinements in list(self.refine_rules.items()): |
||
443 | if rule.startswith("!"): |
||
444 | for prop, val in refinements: |
||
445 | extended_refinements[rule[1:]].remove((prop, val)) |
||
446 | del self.refine_rules[rule] |
||
447 | return extended_refinements |
||
448 | |||
449 | |||
450 | 2 | class ProfileWithSeparatePolicies(ResolvableProfile): |
|
451 | 2 | def __init__(self, * args, ** kwargs): |
|
452 | super(ProfileWithSeparatePolicies, self).__init__(* args, ** kwargs) |
||
453 | self.policies = {} |
||
454 | |||
455 | 2 | def read_yaml_contents(self, yaml_contents): |
|
456 | policies = yaml_contents.pop("policies", None) |
||
457 | if policies: |
||
458 | self._parse_policies(policies) |
||
459 | super(ProfileWithSeparatePolicies, self).read_yaml_contents(yaml_contents) |
||
460 | |||
461 | 2 | def _parse_policies(self, policies_yaml): |
|
462 | for item in policies_yaml: |
||
463 | id_ = required_key(item, "id") |
||
464 | controls_ids = required_key(item, "controls") |
||
465 | if not isinstance(controls_ids, list): |
||
466 | if controls_ids != "all": |
||
467 | msg = ( |
||
468 | "Policy {id_} contains invalid controls list {controls}." |
||
469 | .format(id_=id_, controls=str(controls_ids))) |
||
470 | raise ValueError(msg) |
||
471 | self.policies[id_] = controls_ids |
||
472 | |||
473 | 2 | View Code Duplication | def _process_controls_ids_into_controls(self, controls_manager, policy_id, controls_ids): |
|
|||
474 | controls = [] |
||
475 | for cid in controls_ids: |
||
476 | if not cid.startswith("all"): |
||
477 | controls.extend( |
||
478 | self._controls_ids_to_controls(controls_manager, policy_id, [cid])) |
||
479 | elif ":" in cid: |
||
480 | _, level_id = cid.split(":", 1) |
||
481 | controls.extend( |
||
482 | controls_manager.get_all_controls_of_level(policy_id, level_id)) |
||
483 | else: |
||
484 | controls.extend(controls_manager.get_all_controls(policy_id)) |
||
485 | return controls |
||
486 | |||
487 | 2 | def resolve_controls(self, controls_manager): |
|
488 | for policy_id, controls_ids in self.policies.items(): |
||
489 | controls = [] |
||
490 | |||
491 | if isinstance(controls_ids, list): |
||
492 | controls = self._process_controls_ids_into_controls( |
||
493 | controls_manager, policy_id, controls_ids) |
||
494 | elif controls_ids.startswith("all"): |
||
495 | controls = self._process_controls_ids_into_controls( |
||
496 | controls_manager, policy_id, [controls_ids]) |
||
497 | else: |
||
498 | msg = ( |
||
499 | "Unknown policy content {content} in profile {profile_id}" |
||
500 | .format(content=controls_ids, profile_id=self.id_)) |
||
501 | raise ValueError(msg) |
||
502 | |||
503 | for c in controls: |
||
504 | self._merge_control(c) |
||
505 | |||
506 | 2 | def extend_by(self, extended_profile): |
|
507 | self.policies.update(extended_profile.policies) |
||
508 | super(ProfileWithSeparatePolicies, self).extend_by(extended_profile) |
||
509 | |||
510 | |||
511 | 2 | class ProfileWithInlinePolicies(ResolvableProfile): |
|
512 | 2 | def __init__(self, * args, ** kwargs): |
|
513 | super(ProfileWithInlinePolicies, self).__init__(* args, ** kwargs) |
||
514 | self.controls_by_policy = defaultdict(list) |
||
515 | |||
516 | 2 | def apply_selection(self, item): |
|
517 | # ":" is the delimiter for controls but not when the item is a variable |
||
518 | if ":" in item and "=" not in item: |
||
519 | policy_id, control_id = item.split(":", 1) |
||
520 | self.controls_by_policy[policy_id].append(control_id) |
||
521 | else: |
||
522 | super(ProfileWithInlinePolicies, self).apply_selection(item) |
||
523 | |||
524 | 2 | View Code Duplication | def _process_controls_ids_into_controls(self, controls_manager, policy_id, controls_ids): |
525 | controls = [] |
||
526 | for cid in controls_ids: |
||
527 | if not cid.startswith("all"): |
||
528 | controls.extend( |
||
529 | self._controls_ids_to_controls(controls_manager, policy_id, [cid])) |
||
530 | elif ":" in cid: |
||
531 | _, level_id = cid.split(":", 1) |
||
532 | controls.extend( |
||
533 | controls_manager.get_all_controls_of_level(policy_id, level_id)) |
||
534 | else: |
||
535 | controls.extend( |
||
536 | controls_manager.get_all_controls(policy_id)) |
||
537 | return controls |
||
538 | |||
539 | 2 | def resolve_controls(self, controls_manager): |
|
540 | for policy_id, controls_ids in self.controls_by_policy.items(): |
||
541 | controls = self._process_controls_ids_into_controls( |
||
542 | controls_manager, policy_id, controls_ids) |
||
543 | |||
544 | for c in controls: |
||
545 | self._merge_control(c) |
||
546 | |||
547 | |||
548 | 2 | class Value(object): |
|
549 | """Represents XCCDF Value |
||
550 | """ |
||
551 | |||
552 | 2 | def __init__(self, id_): |
|
553 | 2 | self.id_ = id_ |
|
554 | 2 | self.title = "" |
|
555 | 2 | self.description = "" |
|
556 | 2 | self.type_ = "string" |
|
557 | 2 | self.operator = "equals" |
|
558 | 2 | self.interactive = False |
|
559 | 2 | self.options = {} |
|
560 | 2 | self.warnings = [] |
|
561 | |||
562 | 2 | @staticmethod |
|
563 | 2 | def from_yaml(yaml_file, env_yaml=None): |
|
564 | 2 | yaml_contents = open_and_macro_expand(yaml_file, env_yaml) |
|
565 | 2 | if yaml_contents is None: |
|
566 | return None |
||
567 | |||
568 | 2 | value_id, _ = os.path.splitext(os.path.basename(yaml_file)) |
|
569 | 2 | value = Value(value_id) |
|
570 | 2 | value.title = required_key(yaml_contents, "title") |
|
571 | 2 | del yaml_contents["title"] |
|
572 | 2 | value.description = required_key(yaml_contents, "description") |
|
573 | 2 | del yaml_contents["description"] |
|
574 | 2 | value.type_ = required_key(yaml_contents, "type") |
|
575 | 2 | del yaml_contents["type"] |
|
576 | 2 | value.operator = yaml_contents.pop("operator", "equals") |
|
577 | 2 | possible_operators = ["equals", "not equal", "greater than", |
|
578 | "less than", "greater than or equal", |
||
579 | "less than or equal", "pattern match"] |
||
580 | |||
581 | 2 | if value.operator not in possible_operators: |
|
582 | raise ValueError( |
||
583 | "Found an invalid operator value '%s' in '%s'. " |
||
584 | "Expected one of: %s" |
||
585 | % (value.operator, yaml_file, ", ".join(possible_operators)) |
||
586 | ) |
||
587 | |||
588 | 2 | value.interactive = \ |
|
589 | yaml_contents.pop("interactive", "false").lower() == "true" |
||
590 | |||
591 | 2 | value.options = required_key(yaml_contents, "options") |
|
592 | 2 | del yaml_contents["options"] |
|
593 | 2 | value.warnings = yaml_contents.pop("warnings", []) |
|
594 | |||
595 | 2 | for warning_list in value.warnings: |
|
596 | if len(warning_list) != 1: |
||
597 | raise ValueError("Only one key/value pair should exist for each dictionary") |
||
598 | |||
599 | 2 | if yaml_contents: |
|
600 | raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s" |
||
601 | % (yaml_file, yaml_contents)) |
||
602 | |||
603 | 2 | return value |
|
604 | |||
605 | 2 | def to_xml_element(self): |
|
606 | value = ET.Element('Value') |
||
607 | value.set('id', self.id_) |
||
608 | value.set('type', self.type_) |
||
609 | if self.operator != "equals": # equals is the default |
||
610 | value.set('operator', self.operator) |
||
611 | if self.interactive: # False is the default |
||
612 | value.set('interactive', 'true') |
||
613 | title = ET.SubElement(value, 'title') |
||
614 | title.text = self.title |
||
615 | add_sub_element(value, 'description', self.description) |
||
616 | add_warning_elements(value, self.warnings) |
||
617 | |||
618 | for selector, option in self.options.items(): |
||
619 | # do not confuse Value with big V with value with small v |
||
620 | # value is child element of Value |
||
621 | value_small = ET.SubElement(value, 'value') |
||
622 | # by XCCDF spec, default value is value without selector |
||
623 | if selector != "default": |
||
624 | value_small.set('selector', str(selector)) |
||
625 | value_small.text = str(option) |
||
626 | |||
627 | return value |
||
628 | |||
629 | 2 | def to_file(self, file_name): |
|
630 | root = self.to_xml_element() |
||
631 | tree = ET.ElementTree(root) |
||
632 | tree.write(file_name) |
||
633 | |||
634 | |||
635 | 2 | class Benchmark(object): |
|
636 | """Represents XCCDF Benchmark |
||
637 | """ |
||
638 | 2 | def __init__(self, id_): |
|
639 | self.id_ = id_ |
||
640 | self.title = "" |
||
641 | self.status = "" |
||
642 | self.description = "" |
||
643 | self.notice_id = "" |
||
644 | self.notice_description = "" |
||
645 | self.front_matter = "" |
||
646 | self.rear_matter = "" |
||
647 | self.cpes = [] |
||
648 | self.version = "0.1" |
||
649 | self.profiles = [] |
||
650 | self.values = {} |
||
651 | self.bash_remediation_fns_group = None |
||
652 | self.groups = {} |
||
653 | self.rules = {} |
||
654 | self.product_cpe_names = [] |
||
655 | |||
656 | # This is required for OCIL clauses |
||
657 | conditional_clause = Value("conditional_clause") |
||
658 | conditional_clause.title = "A conditional clause for check statements." |
||
659 | conditional_clause.description = conditional_clause.title |
||
660 | conditional_clause.type_ = "string" |
||
661 | conditional_clause.options = {"": "This is a placeholder"} |
||
662 | |||
663 | self.add_value(conditional_clause) |
||
664 | |||
665 | 2 | @classmethod |
|
666 | 2 | def from_yaml(cls, yaml_file, id_, env_yaml=None): |
|
667 | yaml_contents = open_and_macro_expand(yaml_file, env_yaml) |
||
668 | if yaml_contents is None: |
||
669 | return None |
||
670 | |||
671 | benchmark = cls(id_) |
||
672 | benchmark.title = required_key(yaml_contents, "title") |
||
673 | del yaml_contents["title"] |
||
674 | benchmark.status = required_key(yaml_contents, "status") |
||
675 | del yaml_contents["status"] |
||
676 | benchmark.description = required_key(yaml_contents, "description") |
||
677 | del yaml_contents["description"] |
||
678 | notice_contents = required_key(yaml_contents, "notice") |
||
679 | benchmark.notice_id = required_key(notice_contents, "id") |
||
680 | del notice_contents["id"] |
||
681 | benchmark.notice_description = required_key(notice_contents, |
||
682 | "description") |
||
683 | del notice_contents["description"] |
||
684 | if not notice_contents: |
||
685 | del yaml_contents["notice"] |
||
686 | |||
687 | benchmark.front_matter = required_key(yaml_contents, |
||
688 | "front-matter") |
||
689 | del yaml_contents["front-matter"] |
||
690 | benchmark.rear_matter = required_key(yaml_contents, |
||
691 | "rear-matter") |
||
692 | del yaml_contents["rear-matter"] |
||
693 | benchmark.version = str(required_key(yaml_contents, "version")) |
||
694 | del yaml_contents["version"] |
||
695 | |||
696 | if env_yaml: |
||
697 | benchmark.product_cpe_names = env_yaml["product_cpes"].get_product_cpe_names() |
||
698 | |||
699 | if yaml_contents: |
||
700 | raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s" |
||
701 | % (yaml_file, yaml_contents)) |
||
702 | |||
703 | return benchmark |
||
704 | |||
705 | 2 | def add_profiles_from_dir(self, dir_, env_yaml): |
|
706 | for dir_item in sorted(os.listdir(dir_)): |
||
707 | dir_item_path = os.path.join(dir_, dir_item) |
||
708 | if not os.path.isfile(dir_item_path): |
||
709 | continue |
||
710 | |||
711 | _, ext = os.path.splitext(os.path.basename(dir_item_path)) |
||
712 | if ext != '.profile': |
||
713 | sys.stderr.write( |
||
714 | "Encountered file '%s' while looking for profiles, " |
||
715 | "extension '%s' is unknown. Skipping..\n" |
||
716 | % (dir_item, ext) |
||
717 | ) |
||
718 | continue |
||
719 | |||
720 | try: |
||
721 | new_profile = ProfileWithInlinePolicies.from_yaml(dir_item_path, env_yaml) |
||
722 | except DocumentationNotComplete: |
||
723 | continue |
||
724 | except Exception as exc: |
||
725 | msg = ("Error building profile from '{fname}': '{error}'" |
||
726 | .format(fname=dir_item_path, error=str(exc))) |
||
727 | raise RuntimeError(msg) |
||
728 | if new_profile is None: |
||
729 | continue |
||
730 | |||
731 | self.profiles.append(new_profile) |
||
732 | |||
733 | 2 | def add_bash_remediation_fns_from_file(self, file_): |
|
734 | if not file_: |
||
735 | # bash-remediation-functions.xml doens't exist |
||
736 | return |
||
737 | |||
738 | tree = ET.parse(file_) |
||
739 | self.bash_remediation_fns_group = tree.getroot() |
||
740 | |||
741 | 2 | def to_xml_element(self): |
|
742 | root = ET.Element('Benchmark') |
||
743 | root.set('xmlns:xsi', 'http://www.w3.org/2001/XMLSchema-instance') |
||
744 | root.set('xmlns:xhtml', 'http://www.w3.org/1999/xhtml') |
||
745 | root.set('xmlns:dc', 'http://purl.org/dc/elements/1.1/') |
||
746 | root.set('id', 'product-name') |
||
747 | root.set('xsi:schemaLocation', |
||
748 | 'http://checklists.nist.gov/xccdf/1.1 xccdf-1.1.4.xsd') |
||
749 | root.set('style', 'SCAP_1.1') |
||
750 | root.set('resolved', 'false') |
||
751 | root.set('xml:lang', 'en-US') |
||
752 | status = ET.SubElement(root, 'status') |
||
753 | status.set('date', datetime.date.today().strftime("%Y-%m-%d")) |
||
754 | status.text = self.status |
||
755 | add_sub_element(root, "title", self.title) |
||
756 | add_sub_element(root, "description", self.description) |
||
757 | notice = add_sub_element(root, "notice", self.notice_description) |
||
758 | notice.set('id', self.notice_id) |
||
759 | add_sub_element(root, "front-matter", self.front_matter) |
||
760 | add_sub_element(root, "rear-matter", self.rear_matter) |
||
761 | |||
762 | # The Benchmark applicability is determined by the CPEs |
||
763 | # defined in the product.yml |
||
764 | for cpe_name in self.product_cpe_names: |
||
765 | plat = ET.SubElement(root, "platform") |
||
766 | plat.set("idref", cpe_name) |
||
767 | |||
768 | version = ET.SubElement(root, 'version') |
||
769 | version.text = self.version |
||
770 | ET.SubElement(root, "metadata") |
||
771 | |||
772 | for profile in self.profiles: |
||
773 | root.append(profile.to_xml_element()) |
||
774 | |||
775 | for value in self.values.values(): |
||
776 | root.append(value.to_xml_element()) |
||
777 | if self.bash_remediation_fns_group is not None: |
||
778 | root.append(self.bash_remediation_fns_group) |
||
779 | |||
780 | groups_in_bench = list(self.groups.keys()) |
||
781 | priority_order = ["system", "services"] |
||
782 | groups_in_bench = reorder_according_to_ordering(groups_in_bench, priority_order) |
||
783 | |||
784 | # Make system group the first, followed by services group |
||
785 | for group_id in groups_in_bench: |
||
786 | group = self.groups.get(group_id) |
||
787 | # Products using application benchmark don't have system or services group |
||
788 | if group is not None: |
||
789 | root.append(group.to_xml_element()) |
||
790 | |||
791 | for rule in self.rules.values(): |
||
792 | root.append(rule.to_xml_element()) |
||
793 | |||
794 | return root |
||
795 | |||
796 | 2 | def to_file(self, file_name, ): |
|
797 | root = self.to_xml_element() |
||
798 | tree = ET.ElementTree(root) |
||
799 | tree.write(file_name) |
||
800 | |||
801 | 2 | def add_value(self, value): |
|
802 | if value is None: |
||
803 | return |
||
804 | self.values[value.id_] = value |
||
805 | |||
806 | # The benchmark is also considered a group, so this function signature needs to match |
||
807 | # Group()'s add_group() |
||
808 | 2 | def add_group(self, group, env_yaml=None): |
|
809 | if group is None: |
||
810 | return |
||
811 | self.groups[group.id_] = group |
||
812 | |||
813 | 2 | def add_rule(self, rule): |
|
814 | if rule is None: |
||
815 | return |
||
816 | self.rules[rule.id_] = rule |
||
817 | |||
818 | 2 | def to_xccdf(self): |
|
819 | """We can easily extend this script to generate a valid XCCDF instead |
||
820 | of SSG SHORTHAND. |
||
821 | """ |
||
822 | raise NotImplementedError |
||
823 | |||
824 | 2 | def __str__(self): |
|
825 | return self.id_ |
||
826 | |||
827 | |||
828 | 2 | class Group(object): |
|
829 | """Represents XCCDF Group |
||
830 | """ |
||
831 | 2 | ATTRIBUTES_TO_PASS_ON = ( |
|
832 | "platforms", |
||
833 | ) |
||
834 | |||
835 | 2 | def __init__(self, id_): |
|
836 | self.id_ = id_ |
||
837 | self.prodtype = "all" |
||
838 | self.title = "" |
||
839 | self.description = "" |
||
840 | self.warnings = [] |
||
841 | self.requires = [] |
||
842 | self.conflicts = [] |
||
843 | self.values = {} |
||
844 | self.groups = {} |
||
845 | self.rules = {} |
||
846 | # self.platforms is used further in the build system |
||
847 | # self.platform is merged into self.platforms |
||
848 | # it is here for backward compatibility |
||
849 | self.platforms = set() |
||
850 | self.cpe_names = set() |
||
851 | self.platform = None |
||
852 | |||
853 | 2 | @classmethod |
|
854 | 2 | def from_yaml(cls, yaml_file, env_yaml=None): |
|
855 | yaml_contents = open_and_macro_expand(yaml_file, env_yaml) |
||
856 | if yaml_contents is None: |
||
857 | return None |
||
858 | |||
859 | group_id = os.path.basename(os.path.dirname(yaml_file)) |
||
860 | group = cls(group_id) |
||
861 | group.prodtype = yaml_contents.pop("prodtype", "all") |
||
862 | group.title = required_key(yaml_contents, "title") |
||
863 | del yaml_contents["title"] |
||
864 | group.description = required_key(yaml_contents, "description") |
||
865 | del yaml_contents["description"] |
||
866 | group.warnings = yaml_contents.pop("warnings", []) |
||
867 | group.conflicts = yaml_contents.pop("conflicts", []) |
||
868 | group.requires = yaml_contents.pop("requires", []) |
||
869 | group.platform = yaml_contents.pop("platform", None) |
||
870 | group.platforms = yaml_contents.pop("platforms", set()) |
||
871 | # ensure that content of group.platform is in group.platforms as |
||
872 | # well |
||
873 | if group.platform is not None: |
||
874 | group.platforms.add(group.platform) |
||
875 | |||
876 | if env_yaml: |
||
877 | for platform in group.platforms: |
||
878 | try: |
||
879 | group.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform)) |
||
880 | except CPEDoesNotExist: |
||
881 | print("Unsupported platform '%s' in group '%s'." % (platform, group.id_)) |
||
882 | raise |
||
883 | |||
884 | for warning_list in group.warnings: |
||
885 | if len(warning_list) != 1: |
||
886 | raise ValueError("Only one key/value pair should exist for each dictionary") |
||
887 | |||
888 | if yaml_contents: |
||
889 | raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s" |
||
890 | % (yaml_file, yaml_contents)) |
||
891 | group.validate_prodtype(yaml_file) |
||
892 | return group |
||
893 | |||
894 | 2 | def validate_prodtype(self, yaml_file): |
|
895 | for ptype in self.prodtype.split(","): |
||
896 | if ptype.strip() != ptype: |
||
897 | msg = ( |
||
898 | "Comma-separated '{prodtype}' prodtype " |
||
899 | "in {yaml_file} contains whitespace." |
||
900 | .format(prodtype=self.prodtype, yaml_file=yaml_file)) |
||
901 | raise ValueError(msg) |
||
902 | |||
903 | 2 | def to_xml_element(self): |
|
904 | group = ET.Element('Group') |
||
905 | group.set('id', self.id_) |
||
906 | if self.prodtype != "all": |
||
907 | group.set("prodtype", self.prodtype) |
||
908 | title = ET.SubElement(group, 'title') |
||
909 | title.text = self.title |
||
910 | add_sub_element(group, 'description', self.description) |
||
911 | add_warning_elements(group, self.warnings) |
||
912 | add_nondata_subelements(group, "requires", "id", self.requires) |
||
913 | add_nondata_subelements(group, "conflicts", "id", self.conflicts) |
||
914 | |||
915 | for cpe_name in self.cpe_names: |
||
916 | platform_el = ET.SubElement(group, "platform") |
||
917 | platform_el.set("idref", cpe_name) |
||
918 | |||
919 | for _value in self.values.values(): |
||
920 | group.append(_value.to_xml_element()) |
||
921 | |||
922 | # Rules that install or remove packages affect remediation |
||
923 | # of other rules. |
||
924 | # When packages installed/removed rules come first: |
||
925 | # The Rules are ordered in more logical way, and |
||
926 | # remediation order is natural, first the package is installed, then configured. |
||
927 | rules_in_group = list(self.rules.keys()) |
||
928 | regex = r'(package_.*_(installed|removed))|(service_.*_(enabled|disabled))$' |
||
929 | priority_order = ["installed", "removed", "enabled", "disabled"] |
||
930 | rules_in_group = reorder_according_to_ordering(rules_in_group, priority_order, regex) |
||
931 | |||
932 | # Add rules in priority order, first all packages installed, then removed, |
||
933 | # followed by services enabled, then disabled |
||
934 | for rule_id in rules_in_group: |
||
935 | group.append(self.rules.get(rule_id).to_xml_element()) |
||
936 | |||
937 | # Add the sub groups after any current level group rules. |
||
938 | # As package installed/removed and service enabled/disabled rules are usuallly in |
||
939 | # top level group, this ensures groups that further configure a package or service |
||
940 | # are after rules that install or remove it. |
||
941 | groups_in_group = list(self.groups.keys()) |
||
942 | priority_order = [ |
||
943 | # Make sure rpm_verify_(hashes|permissions|ownership) are run before any other rule. |
||
944 | # Due to conflicts between rules rpm_verify_* rules and any rule that configures |
||
945 | # stricter settings, like file_permissions_grub2_cfg and sudo_dedicated_group, |
||
946 | # the rules deviating from the system default should be evaluated later. |
||
947 | # So that in the end the system has contents, permissions and ownership reset, and |
||
948 | # any deviations or stricter settings are applied by the rules in the profile. |
||
949 | "software", "integrity", "integrity-software", "rpm_verification", |
||
950 | |||
951 | # The account group has to precede audit group because |
||
952 | # the rule package_screen_installed is desired to be executed before the rule |
||
953 | # audit_rules_privileged_commands, othervise the rule |
||
954 | # does not catch newly installed screen binary during remediation |
||
955 | # and report fail |
||
956 | "accounts", "auditing", |
||
957 | |||
958 | |||
959 | # The FIPS group should come before Crypto, |
||
960 | # if we want to set a different (stricter) Crypto Policy than FIPS. |
||
961 | "fips", "crypto", |
||
962 | |||
963 | # The firewalld_activation must come before ruleset_modifications, othervise |
||
964 | # remediations for ruleset_modifications won't work |
||
965 | "firewalld_activation", "ruleset_modifications", |
||
966 | |||
967 | # Rules from group disabling_ipv6 must precede rules from configuring_ipv6, |
||
968 | # otherwise the remediation prints error although it is successful |
||
969 | "disabling_ipv6", "configuring_ipv6" |
||
970 | ] |
||
971 | groups_in_group = reorder_according_to_ordering(groups_in_group, priority_order) |
||
972 | for group_id in groups_in_group: |
||
973 | _group = self.groups[group_id] |
||
974 | group.append(_group.to_xml_element()) |
||
975 | |||
976 | return group |
||
977 | |||
978 | 2 | def to_file(self, file_name): |
|
979 | root = self.to_xml_element() |
||
980 | tree = ET.ElementTree(root) |
||
981 | tree.write(file_name) |
||
982 | |||
983 | 2 | def add_value(self, value): |
|
984 | if value is None: |
||
985 | return |
||
986 | self.values[value.id_] = value |
||
987 | |||
988 | 2 | def add_group(self, group, env_yaml=None): |
|
989 | if group is None: |
||
990 | return |
||
991 | if self.platforms and not group.platforms: |
||
992 | group.platforms = self.platforms |
||
993 | self.groups[group.id_] = group |
||
994 | self._pass_our_properties_on_to(group) |
||
995 | |||
996 | # Once the group has inherited properties, update cpe_names |
||
997 | if env_yaml: |
||
998 | for platform in group.platforms: |
||
999 | try: |
||
1000 | group.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform)) |
||
1001 | except CPEDoesNotExist: |
||
1002 | print("Unsupported platform '%s' in group '%s'." % (platform, group.id_)) |
||
1003 | raise |
||
1004 | |||
1005 | |||
1006 | 2 | def _pass_our_properties_on_to(self, obj): |
|
1007 | for attr in self.ATTRIBUTES_TO_PASS_ON: |
||
1008 | if hasattr(obj, attr) and getattr(obj, attr) is None: |
||
1009 | setattr(obj, attr, getattr(self, attr)) |
||
1010 | |||
1011 | 2 | def add_rule(self, rule, env_yaml=None): |
|
1012 | if rule is None: |
||
1013 | return |
||
1014 | if self.platforms and not rule.platforms: |
||
1015 | rule.platforms = self.platforms |
||
1016 | self.rules[rule.id_] = rule |
||
1017 | self._pass_our_properties_on_to(rule) |
||
1018 | |||
1019 | # Once the rule has inherited properties, update cpe_names |
||
1020 | if env_yaml: |
||
1021 | for platform in rule.platforms: |
||
1022 | try: |
||
1023 | rule.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform)) |
||
1024 | except CPEDoesNotExist: |
||
1025 | print("Unsupported platform '%s' in rule '%s'." % (platform, rule.id_)) |
||
1026 | raise |
||
1027 | |||
1028 | 2 | def __str__(self): |
|
1029 | return self.id_ |
||
1030 | |||
1031 | |||
1032 | 2 | class Rule(object): |
|
1033 | """Represents XCCDF Rule |
||
1034 | """ |
||
1035 | 2 | YAML_KEYS_DEFAULTS = { |
|
1036 | "prodtype": lambda: "all", |
||
1037 | "title": lambda: RuntimeError("Missing key 'title'"), |
||
1038 | "description": lambda: RuntimeError("Missing key 'description'"), |
||
1039 | "rationale": lambda: RuntimeError("Missing key 'rationale'"), |
||
1040 | "severity": lambda: RuntimeError("Missing key 'severity'"), |
||
1041 | "references": lambda: dict(), |
||
1042 | "identifiers": lambda: dict(), |
||
1043 | "ocil_clause": lambda: None, |
||
1044 | "ocil": lambda: None, |
||
1045 | "oval_external_content": lambda: None, |
||
1046 | "warnings": lambda: list(), |
||
1047 | "conflicts": lambda: list(), |
||
1048 | "requires": lambda: list(), |
||
1049 | "platform": lambda: None, |
||
1050 | "platforms": lambda: set(), |
||
1051 | "inherited_platforms": lambda: list(), |
||
1052 | "template": lambda: None, |
||
1053 | "definition_location": lambda: None, |
||
1054 | } |
||
1055 | |||
1056 | 2 | PRODUCT_REFERENCES = ("stigid", "cis",) |
|
1057 | 2 | GLOBAL_REFERENCES = ("srg", "vmmsrg", "disa", "cis-csc",) |
|
1058 | |||
1059 | 2 | def __init__(self, id_): |
|
1060 | 2 | self.id_ = id_ |
|
1061 | 2 | self.prodtype = "all" |
|
1062 | 2 | self.title = "" |
|
1063 | 2 | self.description = "" |
|
1064 | 2 | self.definition_location = "" |
|
1065 | 2 | self.rationale = "" |
|
1066 | 2 | self.severity = "unknown" |
|
1067 | 2 | self.references = {} |
|
1068 | 2 | self.identifiers = {} |
|
1069 | 2 | self.ocil_clause = None |
|
1070 | 2 | self.ocil = None |
|
1071 | 2 | self.oval_external_content = None |
|
1072 | 2 | self.warnings = [] |
|
1073 | 2 | self.requires = [] |
|
1074 | 2 | self.conflicts = [] |
|
1075 | # self.platforms is used further in the build system |
||
1076 | # self.platform is merged into self.platforms |
||
1077 | # it is here for backward compatibility |
||
1078 | 2 | self.platform = None |
|
1079 | 2 | self.platforms = set() |
|
1080 | 2 | self.cpe_names = set() |
|
1081 | 2 | self.inherited_platforms = [] # platforms inherited from the group |
|
1082 | 2 | self.template = None |
|
1083 | 2 | self.local_env_yaml = None |
|
1084 | |||
1085 | 2 | @classmethod |
|
1086 | 2 | def from_yaml(cls, yaml_file, env_yaml=None): |
|
1087 | 2 | yaml_file = os.path.normpath(yaml_file) |
|
1088 | |||
1089 | 2 | rule_id, ext = os.path.splitext(os.path.basename(yaml_file)) |
|
1090 | 2 | if rule_id == "rule" and ext == ".yml": |
|
1091 | 2 | rule_id = get_rule_dir_id(yaml_file) |
|
1092 | |||
1093 | 2 | local_env_yaml = None |
|
1094 | 2 | if env_yaml: |
|
1095 | local_env_yaml = dict() |
||
1096 | local_env_yaml.update(env_yaml) |
||
1097 | local_env_yaml["rule_id"] = rule_id |
||
1098 | |||
1099 | 2 | yaml_contents = open_and_macro_expand(yaml_file, local_env_yaml) |
|
1100 | 2 | if yaml_contents is None: |
|
1101 | return None |
||
1102 | |||
1103 | 2 | rule = cls(rule_id) |
|
1104 | |||
1105 | 2 | if local_env_yaml: |
|
1106 | rule.local_env_yaml = local_env_yaml |
||
1107 | |||
1108 | 2 | try: |
|
1109 | 2 | rule._set_attributes_from_dict(yaml_contents) |
|
1110 | except RuntimeError as exc: |
||
1111 | msg = ("Error processing '{fname}': {err}" |
||
1112 | .format(fname=yaml_file, err=str(exc))) |
||
1113 | raise RuntimeError(msg) |
||
1114 | |||
1115 | # platforms are read as list from the yaml file |
||
1116 | # we need them to convert to set again |
||
1117 | 2 | rule.platforms = set(rule.platforms) |
|
1118 | |||
1119 | 2 | for warning_list in rule.warnings: |
|
1120 | if len(warning_list) != 1: |
||
1121 | raise ValueError("Only one key/value pair should exist for each dictionary") |
||
1122 | |||
1123 | # ensure that content of rule.platform is in rule.platforms as |
||
1124 | # well |
||
1125 | 2 | if rule.platform is not None: |
|
1126 | 2 | rule.platforms.add(rule.platform) |
|
1127 | |||
1128 | # Convert the platform names to CPE names |
||
1129 | # But only do it if an env_yaml was specified (otherwise there would be no product CPEs |
||
1130 | # to lookup), and the rule's prodtype matches the product being built |
||
1131 | 2 | if env_yaml and env_yaml["product"] in parse_prodtype(rule.prodtype): |
|
1132 | for platform in rule.platforms: |
||
1133 | try: |
||
1134 | rule.cpe_names.add(env_yaml["product_cpes"].get_cpe_name(platform)) |
||
1135 | except CPEDoesNotExist: |
||
1136 | print("Unsupported platform '%s' in rule '%s'." % (platform, rule.id_)) |
||
1137 | raise |
||
1138 | |||
1139 | 2 | if yaml_contents: |
|
1140 | raise RuntimeError("Unparsed YAML data in '%s'.\n\n%s" |
||
1141 | % (yaml_file, yaml_contents)) |
||
1142 | |||
1143 | 2 | if not rule.definition_location: |
|
1144 | 2 | rule.definition_location = yaml_file |
|
1145 | |||
1146 | 2 | rule.validate_prodtype(yaml_file) |
|
1147 | 2 | rule.validate_identifiers(yaml_file) |
|
1148 | 2 | rule.validate_references(yaml_file) |
|
1149 | 2 | return rule |
|
1150 | |||
1151 | 2 | def _verify_stigid_format(self, product): |
|
1152 | 2 | stig_id = self.references.get("stigid", None) |
|
1153 | 2 | if not stig_id: |
|
1154 | 2 | return |
|
1155 | 2 | if "," in stig_id: |
|
1156 | 2 | raise ValueError("Rules can not have multiple STIG IDs.") |
|
1157 | |||
1158 | 2 | def _verify_disa_cci_format(self): |
|
1159 | 2 | cci_id = self.references.get("disa", None) |
|
1160 | 2 | if not cci_id: |
|
1161 | 2 | return |
|
1162 | cci_ex = re.compile(r'^CCI-[0-9]{6}$') |
||
1163 | for cci in cci_id.split(","): |
||
1164 | if not cci_ex.match(cci): |
||
1165 | raise ValueError("CCI '{}' is in the wrong format! " |
||
1166 | "Format should be similar to: " |
||
1167 | "CCI-XXXXXX".format(cci)) |
||
1168 | self.references["disa"] = cci_id |
||
1169 | |||
1170 | 2 | def normalize(self, product): |
|
1171 | 2 | try: |
|
1172 | 2 | self.make_refs_and_identifiers_product_specific(product) |
|
1173 | 2 | self.make_template_product_specific(product) |
|
1174 | 2 | except Exception as exc: |
|
1175 | 2 | msg = ( |
|
1176 | "Error normalizing '{rule}': {msg}" |
||
1177 | .format(rule=self.id_, msg=str(exc)) |
||
1178 | ) |
||
1179 | 2 | raise RuntimeError(msg) |
|
1180 | |||
1181 | 2 | def _get_product_only_references(self): |
|
1182 | 2 | product_references = dict() |
|
1183 | |||
1184 | 2 | for ref in Rule.PRODUCT_REFERENCES: |
|
1185 | 2 | start = "{0}@".format(ref) |
|
1186 | 2 | for gref, gval in self.references.items(): |
|
1187 | 2 | if ref == gref or gref.startswith(start): |
|
1188 | 2 | product_references[gref] = gval |
|
1189 | 2 | return product_references |
|
1190 | |||
1191 | 2 | def make_template_product_specific(self, product): |
|
1192 | 2 | product_suffix = "@{0}".format(product) |
|
1193 | |||
1194 | 2 | if not self.template: |
|
1195 | return |
||
1196 | |||
1197 | 2 | not_specific_vars = self.template.get("vars", dict()) |
|
1198 | 2 | specific_vars = self._make_items_product_specific( |
|
1199 | not_specific_vars, product_suffix, True) |
||
1200 | 2 | self.template["vars"] = specific_vars |
|
1201 | |||
1202 | 2 | not_specific_backends = self.template.get("backends", dict()) |
|
1203 | 2 | specific_backends = self._make_items_product_specific( |
|
1204 | not_specific_backends, product_suffix, True) |
||
1205 | 2 | self.template["backends"] = specific_backends |
|
1206 | |||
1207 | 2 | def make_refs_and_identifiers_product_specific(self, product): |
|
1208 | 2 | product_suffix = "@{0}".format(product) |
|
1209 | |||
1210 | 2 | product_references = self._get_product_only_references() |
|
1211 | 2 | general_references = self.references.copy() |
|
1212 | 2 | for todel in product_references: |
|
1213 | 2 | general_references.pop(todel) |
|
1214 | 2 | for ref in Rule.PRODUCT_REFERENCES: |
|
1215 | 2 | if ref in general_references: |
|
1216 | msg = "Unexpected reference identifier ({0}) without " |
||
1217 | msg += "product qualifier ({0}@{1}) while building rule " |
||
1218 | msg += "{2}" |
||
1219 | msg = msg.format(ref, product, self.id_) |
||
1220 | raise ValueError(msg) |
||
1221 | |||
1222 | 2 | to_set = dict( |
|
1223 | identifiers=(self.identifiers, False), |
||
1224 | general_references=(general_references, True), |
||
1225 | product_references=(product_references, False), |
||
1226 | ) |
||
1227 | 2 | for name, (dic, allow_overwrites) in to_set.items(): |
|
1228 | 2 | try: |
|
1229 | 2 | new_items = self._make_items_product_specific( |
|
1230 | dic, product_suffix, allow_overwrites) |
||
1231 | 2 | except ValueError as exc: |
|
1232 | 2 | msg = ( |
|
1233 | "Error processing {what} for rule '{rid}': {msg}" |
||
1234 | .format(what=name, rid=self.id_, msg=str(exc)) |
||
1235 | ) |
||
1236 | 2 | raise ValueError(msg) |
|
1237 | 2 | dic.clear() |
|
1238 | 2 | dic.update(new_items) |
|
1239 | |||
1240 | 2 | self.references = general_references |
|
1241 | 2 | self._verify_disa_cci_format() |
|
1242 | 2 | self.references.update(product_references) |
|
1243 | |||
1244 | 2 | self._verify_stigid_format(product) |
|
1245 | |||
1246 | 2 | def _make_items_product_specific(self, items_dict, product_suffix, allow_overwrites=False): |
|
1247 | 2 | new_items = dict() |
|
1248 | 2 | for full_label, value in items_dict.items(): |
|
1249 | 2 | if "@" not in full_label and full_label not in new_items: |
|
1250 | 2 | new_items[full_label] = value |
|
1251 | 2 | continue |
|
1252 | |||
1253 | 2 | label = full_label.split("@")[0] |
|
1254 | |||
1255 | # this test should occur before matching product_suffix with the product qualifier |
||
1256 | # present in the reference, so it catches problems even for products that are not |
||
1257 | # being built at the moment |
||
1258 | 2 | if label in Rule.GLOBAL_REFERENCES: |
|
1259 | msg = ( |
||
1260 | "You cannot use product-qualified for the '{item_u}' reference. " |
||
1261 | "Please remove the product-qualifier and merge values with the " |
||
1262 | "existing reference if there is any. Original line: {item_q}: {value_q}" |
||
1263 | .format(item_u=label, item_q=full_label, value_q=value) |
||
1264 | ) |
||
1265 | raise ValueError(msg) |
||
1266 | |||
1267 | 2 | if not full_label.endswith(product_suffix): |
|
1268 | 2 | continue |
|
1269 | |||
1270 | 2 | if label in items_dict and not allow_overwrites and value != items_dict[label]: |
|
1271 | 2 | msg = ( |
|
1272 | "There is a product-qualified '{item_q}' item, " |
||
1273 | "but also an unqualified '{item_u}' item " |
||
1274 | "and those two differ in value - " |
||
1275 | "'{value_q}' vs '{value_u}' respectively." |
||
1276 | .format(item_q=full_label, item_u=label, |
||
1277 | value_q=value, value_u=items_dict[label]) |
||
1278 | ) |
||
1279 | 2 | raise ValueError(msg) |
|
1280 | 2 | new_items[label] = value |
|
1281 | 2 | return new_items |
|
1282 | |||
1283 | 2 | def _set_attributes_from_dict(self, yaml_contents): |
|
1284 | 2 | for key, default_getter in self.YAML_KEYS_DEFAULTS.items(): |
|
1285 | 2 | if key not in yaml_contents: |
|
1286 | 2 | value = default_getter() |
|
1287 | 2 | if isinstance(value, Exception): |
|
1288 | raise value |
||
1289 | else: |
||
1290 | 2 | value = yaml_contents.pop(key) |
|
1291 | |||
1292 | 2 | setattr(self, key, value) |
|
1293 | |||
1294 | 2 | def to_contents_dict(self): |
|
1295 | """ |
||
1296 | Returns a dictionary that is the same schema as the dict obtained when loading rule YAML. |
||
1297 | """ |
||
1298 | |||
1299 | 2 | yaml_contents = dict() |
|
1300 | 2 | for key in Rule.YAML_KEYS_DEFAULTS: |
|
1301 | 2 | yaml_contents[key] = getattr(self, key) |
|
1302 | |||
1303 | 2 | return yaml_contents |
|
1304 | |||
1305 | 2 | def validate_identifiers(self, yaml_file): |
|
1306 | 2 | if self.identifiers is None: |
|
1307 | raise ValueError("Empty identifier section in file %s" % yaml_file) |
||
1308 | |||
1309 | # Validate all identifiers are non-empty: |
||
1310 | 2 | for ident_type, ident_val in self.identifiers.items(): |
|
1311 | 2 | if not isinstance(ident_type, str) or not isinstance(ident_val, str): |
|
1312 | raise ValueError("Identifiers and values must be strings: %s in file %s" |
||
1313 | % (ident_type, yaml_file)) |
||
1314 | 2 | if ident_val.strip() == "": |
|
1315 | raise ValueError("Identifiers must not be empty: %s in file %s" |
||
1316 | % (ident_type, yaml_file)) |
||
1317 | 2 | if ident_type[0:3] == 'cce': |
|
1318 | 2 | if not is_cce_format_valid(ident_val): |
|
1319 | raise ValueError("CCE Identifier format must be valid: invalid format '%s' for CEE '%s'" |
||
1320 | " in file '%s'" % (ident_val, ident_type, yaml_file)) |
||
1321 | 2 | if not is_cce_value_valid("CCE-" + ident_val): |
|
1322 | raise ValueError("CCE Identifier value is not a valid checksum: invalid value '%s' for CEE '%s'" |
||
1323 | " in file '%s'" % (ident_val, ident_type, yaml_file)) |
||
1324 | |||
1325 | 2 | def validate_references(self, yaml_file): |
|
1326 | 2 | if self.references is None: |
|
1327 | raise ValueError("Empty references section in file %s" % yaml_file) |
||
1328 | |||
1329 | 2 | for ref_type, ref_val in self.references.items(): |
|
1330 | 2 | if not isinstance(ref_type, str) or not isinstance(ref_val, str): |
|
1331 | raise ValueError("References and values must be strings: %s in file %s" |
||
1332 | % (ref_type, yaml_file)) |
||
1333 | 2 | if ref_val.strip() == "": |
|
1334 | raise ValueError("References must not be empty: %s in file %s" |
||
1335 | % (ref_type, yaml_file)) |
||
1336 | |||
1337 | 2 | for ref_type, ref_val in self.references.items(): |
|
1338 | 2 | for ref in ref_val.split(","): |
|
1339 | 2 | if ref.strip() != ref: |
|
1340 | msg = ( |
||
1341 | "Comma-separated '{ref_type}' reference " |
||
1342 | "in {yaml_file} contains whitespace." |
||
1343 | .format(ref_type=ref_type, yaml_file=yaml_file)) |
||
1344 | raise ValueError(msg) |
||
1345 | |||
1346 | 2 | def validate_prodtype(self, yaml_file): |
|
1347 | 2 | for ptype in self.prodtype.split(","): |
|
1348 | 2 | if ptype.strip() != ptype: |
|
1349 | msg = ( |
||
1350 | "Comma-separated '{prodtype}' prodtype " |
||
1351 | "in {yaml_file} contains whitespace." |
||
1352 | .format(prodtype=self.prodtype, yaml_file=yaml_file)) |
||
1353 | raise ValueError(msg) |
||
1354 | |||
1355 | 2 | def to_xml_element(self): |
|
1356 | rule = ET.Element('Rule') |
||
1357 | rule.set('id', self.id_) |
||
1358 | if self.prodtype != "all": |
||
1359 | rule.set("prodtype", self.prodtype) |
||
1360 | rule.set('severity', self.severity) |
||
1361 | add_sub_element(rule, 'title', self.title) |
||
1362 | add_sub_element(rule, 'description', self.description) |
||
1363 | add_sub_element(rule, 'rationale', self.rationale) |
||
1364 | |||
1365 | main_ident = ET.Element('ident') |
||
1366 | for ident_type, ident_val in self.identifiers.items(): |
||
1367 | # This is not true if items were normalized |
||
1368 | if '@' in ident_type: |
||
1369 | # the ident is applicable only on some product |
||
1370 | # format : 'policy@product', eg. 'stigid@product' |
||
1371 | # for them, we create a separate <ref> element |
||
1372 | policy, product = ident_type.split('@') |
||
1373 | ident = ET.SubElement(rule, 'ident') |
||
1374 | ident.set(policy, ident_val) |
||
1375 | ident.set('prodtype', product) |
||
1376 | else: |
||
1377 | main_ident.set(ident_type, ident_val) |
||
1378 | |||
1379 | if main_ident.attrib: |
||
1380 | rule.append(main_ident) |
||
1381 | |||
1382 | main_ref = ET.Element('ref') |
||
1383 | for ref_type, ref_val in self.references.items(): |
||
1384 | # This is not true if items were normalized |
||
1385 | if '@' in ref_type: |
||
1386 | # the reference is applicable only on some product |
||
1387 | # format : 'policy@product', eg. 'stigid@product' |
||
1388 | # for them, we create a separate <ref> element |
||
1389 | policy, product = ref_type.split('@') |
||
1390 | ref = ET.SubElement(rule, 'ref') |
||
1391 | ref.set(policy, ref_val) |
||
1392 | ref.set('prodtype', product) |
||
1393 | else: |
||
1394 | main_ref.set(ref_type, ref_val) |
||
1395 | |||
1396 | if main_ref.attrib: |
||
1397 | rule.append(main_ref) |
||
1398 | |||
1399 | if self.oval_external_content: |
||
1400 | check = ET.SubElement(rule, 'check') |
||
1401 | check.set("system", "http://oval.mitre.org/XMLSchema/oval-definitions-5") |
||
1402 | external_content = ET.SubElement(check, "check-content-ref") |
||
1403 | external_content.set("href", self.oval_external_content) |
||
1404 | else: |
||
1405 | # TODO: This is pretty much a hack, oval ID will be the same as rule ID |
||
1406 | # and we don't want the developers to have to keep them in sync. |
||
1407 | # Therefore let's just add an OVAL ref of that ID. |
||
1408 | oval_ref = ET.SubElement(rule, "oval") |
||
1409 | oval_ref.set("id", self.id_) |
||
1410 | |||
1411 | if self.ocil or self.ocil_clause: |
||
1412 | ocil = add_sub_element(rule, 'ocil', self.ocil if self.ocil else "") |
||
1413 | if self.ocil_clause: |
||
1414 | ocil.set("clause", self.ocil_clause) |
||
1415 | |||
1416 | add_warning_elements(rule, self.warnings) |
||
1417 | add_nondata_subelements(rule, "requires", "id", self.requires) |
||
1418 | add_nondata_subelements(rule, "conflicts", "id", self.conflicts) |
||
1419 | |||
1420 | for cpe_name in self.cpe_names: |
||
1421 | platform_el = ET.SubElement(rule, "platform") |
||
1422 | platform_el.set("idref", cpe_name) |
||
1423 | |||
1424 | return rule |
||
1425 | |||
1426 | 2 | def to_file(self, file_name): |
|
1427 | root = self.to_xml_element() |
||
1428 | tree = ET.ElementTree(root) |
||
1429 | tree.write(file_name) |
||
1430 | |||
1431 | |||
1432 | 2 | class DirectoryLoader(object): |
|
1433 | 2 | def __init__(self, profiles_dir, bash_remediation_fns, env_yaml): |
|
1434 | self.benchmark_file = None |
||
1435 | self.group_file = None |
||
1436 | self.loaded_group = None |
||
1437 | self.rule_files = [] |
||
1438 | self.value_files = [] |
||
1439 | self.subdirectories = [] |
||
1440 | |||
1441 | self.all_values = set() |
||
1442 | self.all_rules = set() |
||
1443 | self.all_groups = set() |
||
1444 | |||
1445 | self.profiles_dir = profiles_dir |
||
1446 | self.bash_remediation_fns = bash_remediation_fns |
||
1447 | self.env_yaml = env_yaml |
||
1448 | self.product = env_yaml["product"] |
||
1449 | |||
1450 | self.parent_group = None |
||
1451 | |||
1452 | 2 | def _collect_items_to_load(self, guide_directory): |
|
1453 | for dir_item in sorted(os.listdir(guide_directory)): |
||
1454 | dir_item_path = os.path.join(guide_directory, dir_item) |
||
1455 | _, extension = os.path.splitext(dir_item) |
||
1456 | |||
1457 | if extension == '.var': |
||
1458 | self.value_files.append(dir_item_path) |
||
1459 | elif dir_item == "benchmark.yml": |
||
1460 | if self.benchmark_file: |
||
1461 | raise ValueError("Multiple benchmarks in one directory") |
||
1462 | self.benchmark_file = dir_item_path |
||
1463 | elif dir_item == "group.yml": |
||
1464 | if self.group_file: |
||
1465 | raise ValueError("Multiple groups in one directory") |
||
1466 | self.group_file = dir_item_path |
||
1467 | elif extension == '.rule': |
||
1468 | self.rule_files.append(dir_item_path) |
||
1469 | elif is_rule_dir(dir_item_path): |
||
1470 | self.rule_files.append(get_rule_dir_yaml(dir_item_path)) |
||
1471 | elif dir_item != "tests": |
||
1472 | if os.path.isdir(dir_item_path): |
||
1473 | self.subdirectories.append(dir_item_path) |
||
1474 | else: |
||
1475 | sys.stderr.write( |
||
1476 | "Encountered file '%s' while recursing, extension '%s' " |
||
1477 | "is unknown. Skipping..\n" |
||
1478 | % (dir_item, extension) |
||
1479 | ) |
||
1480 | |||
1481 | 2 | def load_benchmark_or_group(self, guide_directory): |
|
1482 | """ |
||
1483 | Loads a given benchmark or group from the specified benchmark_file or |
||
1484 | group_file, in the context of guide_directory, profiles_dir, |
||
1485 | env_yaml, and bash_remediation_fns. |
||
1486 | |||
1487 | Returns the loaded group or benchmark. |
||
1488 | """ |
||
1489 | group = None |
||
1490 | if self.group_file and self.benchmark_file: |
||
1491 | raise ValueError("A .benchmark file and a .group file were found in " |
||
1492 | "the same directory '%s'" % (guide_directory)) |
||
1493 | |||
1494 | # we treat benchmark as a special form of group in the following code |
||
1495 | if self.benchmark_file: |
||
1496 | group = Benchmark.from_yaml( |
||
1497 | self.benchmark_file, 'product-name', self.env_yaml |
||
1498 | ) |
||
1499 | if self.profiles_dir: |
||
1500 | group.add_profiles_from_dir(self.profiles_dir, self.env_yaml) |
||
1501 | group.add_bash_remediation_fns_from_file(self.bash_remediation_fns) |
||
1502 | |||
1503 | if self.group_file: |
||
1504 | group = Group.from_yaml(self.group_file, self.env_yaml) |
||
1505 | self.all_groups.add(group.id_) |
||
1506 | |||
1507 | return group |
||
1508 | |||
1509 | 2 | def _load_group_process_and_recurse(self, guide_directory): |
|
1510 | self.loaded_group = self.load_benchmark_or_group(guide_directory) |
||
1511 | |||
1512 | if self.loaded_group: |
||
1513 | if self.parent_group: |
||
1514 | self.parent_group.add_group(self.loaded_group, env_yaml=self.env_yaml) |
||
1515 | |||
1516 | self._process_values() |
||
1517 | self._recurse_into_subdirs() |
||
1518 | self._process_rules() |
||
1519 | |||
1520 | 2 | def process_directory_tree(self, start_dir, extra_group_dirs=None): |
|
1521 | self._collect_items_to_load(start_dir) |
||
1522 | if extra_group_dirs is not None: |
||
1523 | self.subdirectories += extra_group_dirs |
||
1524 | self._load_group_process_and_recurse(start_dir) |
||
1525 | |||
1526 | 2 | def _recurse_into_subdirs(self): |
|
1527 | for subdir in self.subdirectories: |
||
1528 | loader = self._get_new_loader() |
||
1529 | loader.parent_group = self.loaded_group |
||
1530 | loader.process_directory_tree(subdir) |
||
1531 | self.all_values.update(loader.all_values) |
||
1532 | self.all_rules.update(loader.all_rules) |
||
1533 | self.all_groups.update(loader.all_groups) |
||
1534 | |||
1535 | 2 | def _get_new_loader(self): |
|
1536 | raise NotImplementedError() |
||
1537 | |||
1538 | 2 | def _process_values(self): |
|
1539 | raise NotImplementedError() |
||
1540 | |||
1541 | 2 | def _process_rules(self): |
|
1542 | raise NotImplementedError() |
||
1543 | |||
1544 | |||
1545 | 2 | class BuildLoader(DirectoryLoader): |
|
1546 | 2 | def __init__(self, profiles_dir, bash_remediation_fns, env_yaml, resolved_rules_dir=None): |
|
1547 | super(BuildLoader, self).__init__(profiles_dir, bash_remediation_fns, env_yaml) |
||
1548 | |||
1549 | self.resolved_rules_dir = resolved_rules_dir |
||
1550 | if resolved_rules_dir and not os.path.isdir(resolved_rules_dir): |
||
1551 | os.mkdir(resolved_rules_dir) |
||
1552 | |||
1553 | 2 | def _process_values(self): |
|
1554 | for value_yaml in self.value_files: |
||
1555 | value = Value.from_yaml(value_yaml, self.env_yaml) |
||
1556 | self.all_values.add(value) |
||
1557 | self.loaded_group.add_value(value) |
||
1558 | |||
1559 | 2 | def _process_rules(self): |
|
1560 | for rule_yaml in self.rule_files: |
||
1561 | try: |
||
1562 | rule = Rule.from_yaml(rule_yaml, self.env_yaml) |
||
1563 | except DocumentationNotComplete: |
||
1564 | # Happens on non-debug build when a rule is "documentation-incomplete" |
||
1565 | continue |
||
1566 | prodtypes = parse_prodtype(rule.prodtype) |
||
1567 | if "all" not in prodtypes and self.product not in prodtypes: |
||
1568 | continue |
||
1569 | self.all_rules.add(rule) |
||
1570 | self.loaded_group.add_rule(rule, env_yaml=self.env_yaml) |
||
1571 | |||
1572 | if self.loaded_group.platforms: |
||
1573 | rule.inherited_platforms += self.loaded_group.platforms |
||
1574 | |||
1575 | if self.resolved_rules_dir: |
||
1576 | output_for_rule = os.path.join( |
||
1577 | self.resolved_rules_dir, "{id_}.yml".format(id_=rule.id_)) |
||
1578 | mkdir_p(self.resolved_rules_dir) |
||
1579 | with open(output_for_rule, "w") as f: |
||
1580 | rule.normalize(self.env_yaml["product"]) |
||
1581 | yaml.dump(rule.to_contents_dict(), f) |
||
1582 | |||
1583 | 2 | def _get_new_loader(self): |
|
1584 | return BuildLoader( |
||
1585 | self.profiles_dir, self.bash_remediation_fns, self.env_yaml, self.resolved_rules_dir) |
||
1586 | |||
1587 | 2 | def export_group_to_file(self, filename): |
|
1588 | return self.loaded_group.to_file(filename) |
||
1589 |