1 | #!/usr/bin/python3 |
||
2 | |||
3 | import os |
||
4 | import re |
||
5 | import sys |
||
6 | |||
7 | from collections import OrderedDict |
||
8 | |||
9 | import ssg.rules |
||
10 | import ssg.utils |
||
11 | from ssg.utils import mkdir_p |
||
12 | import ssg.yaml |
||
13 | import ssg.build_yaml |
||
14 | import ssg.build_remediations |
||
15 | import ssg.environment |
||
16 | import ssg.products |
||
17 | |||
18 | COMMENTS_TO_PARSE = ["strategy", "complexity", "disruption"] |
||
19 | |||
20 | |||
21 | class PlaybookBuilder(): |
||
22 | def __init__(self, product_yaml_path, input_dir, output_dir, rules_dir, profiles_dir, |
||
23 | build_config_yaml): |
||
24 | self.input_dir = input_dir |
||
25 | self.output_dir = output_dir |
||
26 | self.rules_dir = rules_dir |
||
27 | self.build_config_yaml = build_config_yaml |
||
28 | self.product_yaml_path = product_yaml_path |
||
29 | product_yaml = ssg.products.Product(product_yaml_path) |
||
30 | product_dir = product_yaml["product_dir"] |
||
31 | relative_guide_dir = ssg.utils.required_key(product_yaml, |
||
32 | "benchmark_root") |
||
33 | self.guide_dir = os.path.abspath(os.path.join(product_dir, |
||
34 | relative_guide_dir)) |
||
35 | self.profiles_dir = profiles_dir |
||
36 | additional_content_directories = product_yaml.get( |
||
37 | "additional_content_directories", []) |
||
38 | self.add_content_dirs = [os.path.abspath(os.path.join(product_dir, rd)) |
||
39 | for rd in additional_content_directories] |
||
40 | |||
41 | def choose_variable_value(self, var_id, variables, refinements): |
||
42 | """ |
||
43 | Determine value of variable based on profile refinements. |
||
44 | """ |
||
45 | if refinements and var_id in refinements: |
||
46 | selector = refinements[var_id] |
||
47 | else: |
||
48 | selector = "default" |
||
49 | try: |
||
50 | options = variables[var_id] |
||
51 | except KeyError: |
||
52 | raise ValueError("Variable '%s' doesn't exist." % var_id) |
||
53 | try: |
||
54 | value = options[selector] |
||
55 | except KeyError: |
||
56 | if len(options.keys()) == 1: |
||
57 | # We will assume that if there is just one option that it could |
||
58 | # be selected as a default option. |
||
59 | value = list(options.values())[0] |
||
60 | elif selector == "default": |
||
61 | # If no 'default' selector is present in the XCCDF value, |
||
62 | # choose the first (consistent with oscap behavior) |
||
63 | value = list(options.values())[0] |
||
64 | else: |
||
65 | raise ValueError( |
||
66 | "Selector '%s' doesn't exist in variable '%s'. " |
||
67 | "Available selectors: %s." % |
||
68 | (selector, var_id, ", ".join(options.keys())) |
||
69 | ) |
||
70 | return value |
||
71 | |||
72 | def get_data_from_snippet(self, snippet_yaml, variables, refinements): |
||
73 | """ |
||
74 | Extracts and resolves tasks and variables from Ansible snippet. |
||
75 | """ |
||
76 | xccdf_var_pattern = re.compile(r"\(xccdf-var\s+(\S+)\)") |
||
77 | tasks = [] |
||
78 | values = dict() |
||
79 | for item in snippet_yaml: |
||
80 | if isinstance(item, str): |
||
81 | match = xccdf_var_pattern.match(item) |
||
82 | if match: |
||
83 | var_id = match.group(1) |
||
84 | value = self.choose_variable_value(var_id, variables, |
||
85 | refinements) |
||
86 | values[var_id] = value |
||
87 | else: |
||
88 | raise ValueError("Found unknown item '%s'" % item) |
||
89 | else: |
||
90 | tasks.append(item) |
||
91 | return tasks, values |
||
92 | |||
93 | def get_benchmark_variables(self): |
||
94 | """ |
||
95 | Get all variables, their selectors and values used in a given |
||
96 | benchmark. Returns a dictionary where keys are variable IDs and |
||
97 | values are dictionaries where keys are selectors and values are |
||
98 | variable values. |
||
99 | """ |
||
100 | variables = dict() |
||
101 | for cur_dir in [self.guide_dir] + self.add_content_dirs: |
||
102 | variables.update(self._get_rules_variables(cur_dir)) |
||
103 | return variables |
||
104 | |||
105 | def _get_rules_variables(self, base_dir): |
||
106 | for dirpath, dirnames, filenames in os.walk(base_dir): |
||
107 | dirnames.sort() |
||
108 | filenames.sort() |
||
109 | for filename in filenames: |
||
110 | root, ext = os.path.splitext(filename) |
||
111 | if ext == ".var": |
||
112 | full_path = os.path.join(dirpath, filename) |
||
113 | xccdf_value = ssg.build_yaml.Value.from_yaml(full_path) |
||
114 | # Make sure that selectors and values are strings |
||
115 | options = dict() |
||
116 | for k, v in xccdf_value.options.items(): |
||
117 | options[str(k)] = str(v) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
![]() |
|||
118 | yield (xccdf_value.id_, options,) |
||
119 | |||
120 | def _find_rule_title(self, rule_id): |
||
121 | rule_path = os.path.join(self.rules_dir, rule_id + ".yml") |
||
122 | rule_yaml = ssg.yaml.open_raw(rule_path) |
||
123 | return rule_yaml["title"] |
||
124 | |||
125 | def create_playbook(self, snippet_path, rule_id, variables, |
||
126 | refinements, output_dir): |
||
127 | """ |
||
128 | Creates a Playbook from Ansible snippet for the given rule specified |
||
129 | by rule ID, fills in the profile values and saves it into output_dir. |
||
130 | """ |
||
131 | |||
132 | with open(snippet_path, "r") as snippet_file: |
||
133 | snippet_str = snippet_file.read() |
||
134 | fix = ssg.build_remediations.split_remediation_content_and_metadata(snippet_str) |
||
135 | snippet_yaml = ssg.yaml.ordered_load(fix.contents) |
||
136 | |||
137 | play_tasks, play_vars = self.get_data_from_snippet( |
||
138 | snippet_yaml, variables, refinements |
||
139 | ) |
||
140 | |||
141 | if len(play_tasks) == 0: |
||
142 | raise ValueError( |
||
143 | "Ansible remediation for rule '%s' in '%s' " |
||
144 | "doesn't contain any task." % |
||
145 | (rule_id, snippet_path) |
||
146 | ) |
||
147 | |||
148 | tags = set() |
||
149 | for task in play_tasks: |
||
150 | tags |= set(task.pop("tags", [])) |
||
151 | |||
152 | play = OrderedDict() |
||
153 | play["name"] = self._find_rule_title(rule_id) |
||
154 | play["hosts"] = "@@HOSTS@@" |
||
155 | play["become"] = True |
||
156 | if len(play_vars) > 0: |
||
157 | play["vars"] = play_vars |
||
158 | if len(tags) > 0: |
||
159 | play["tags"] = sorted(list(tags)) |
||
160 | play["tasks"] = play_tasks |
||
161 | |||
162 | playbook = [play] |
||
163 | playbook_path = os.path.join(output_dir, rule_id + ".yml") |
||
164 | with open(playbook_path, "w") as playbook_file: |
||
165 | # write remediation metadata (complexity, strategy, etc.) first |
||
166 | for k, v in fix.config.items(): |
||
167 | playbook_file.write("# %s = %s\n" % (k, v)) |
||
168 | ssg.yaml.ordered_dump( |
||
169 | playbook, playbook_file, default_flow_style=False |
||
170 | ) |
||
171 | |||
172 | def open_profile(self, profile_path): |
||
173 | """ |
||
174 | Opens and parses profile at the given profile_path. |
||
175 | """ |
||
176 | if not os.path.isfile(profile_path): |
||
177 | raise RuntimeError("'%s' is not a file!\n" % profile_path) |
||
178 | profile_id, ext = os.path.splitext(os.path.basename(profile_path)) |
||
179 | if ext != ".profile": |
||
180 | raise RuntimeError( |
||
181 | "Encountered file '%s' while looking for profiles, " |
||
182 | "extension '%s' is unknown. Skipping..\n" |
||
183 | % (profile_path, ext) |
||
184 | ) |
||
185 | |||
186 | env_yaml = ssg.environment.open_environment(self.build_config_yaml, self.product_yaml_path) |
||
187 | profile = ssg.build_yaml.Profile.from_yaml(profile_path, env_yaml) |
||
188 | if not profile: |
||
189 | raise RuntimeError("Could not parse profile %s.\n" % profile_path) |
||
190 | return profile |
||
191 | |||
192 | def create_playbooks_for_all_rules_in_profile(self, profile, variables): |
||
193 | """ |
||
194 | Creates a Playbook for each rule selected in a profile from tasks |
||
195 | extracted from snippets. Created Playbooks are parametrized by |
||
196 | variables according to profile selection. Playbooks are written into |
||
197 | a new subdirectory in output_dir. |
||
198 | """ |
||
199 | profile_rules = profile.get_rule_selectors() |
||
200 | profile_refines = profile.get_variable_selectors() |
||
201 | |||
202 | profile_playbooks_dir = os.path.join(self.output_dir, profile.id_) |
||
203 | mkdir_p(profile_playbooks_dir) |
||
204 | |||
205 | for rule_id in profile_rules: |
||
206 | snippet_path = os.path.join(self.input_dir, rule_id + ".yml") |
||
207 | if os.path.exists(snippet_path): |
||
208 | self.create_playbook( |
||
209 | snippet_path, rule_id, variables, |
||
210 | profile_refines, profile_playbooks_dir |
||
211 | ) |
||
212 | |||
213 | def create_playbook_for_single_rule(self, profile, rule_id, variables): |
||
214 | """ |
||
215 | Creates a Playbook for given rule specified by a rule_id. Created |
||
216 | Playbooks are parametrized by variables according to profile selection. |
||
217 | Playbooks are written into a new subdirectory in output_dir. |
||
218 | """ |
||
219 | profile_rules = profile.get_rule_selectors() |
||
220 | profile_refines = profile.get_variable_selectors() |
||
221 | profile_playbooks_dir = os.path.join(self.output_dir, profile.id_) |
||
222 | mkdir_p(profile_playbooks_dir) |
||
223 | snippet_path = os.path.join(self.input_dir, rule_id + ".yml") |
||
224 | if rule_id in profile_rules: |
||
225 | self.create_playbook( |
||
226 | snippet_path, rule_id, variables, |
||
227 | profile_refines, profile_playbooks_dir |
||
228 | ) |
||
229 | else: |
||
230 | raise ValueError("Rule '%s' isn't part of profile '%s'" % |
||
231 | (rule_id, profile.id_)) |
||
232 | |||
233 | def create_playbooks_for_all_rules(self, variables): |
||
234 | profile_playbooks_dir = os.path.join(self.output_dir, "all") |
||
235 | mkdir_p(profile_playbooks_dir) |
||
236 | for rule in sorted(os.listdir(self.rules_dir)): |
||
237 | rule_id, _ = os.path.splitext(rule) |
||
238 | snippet_path = os.path.join(self.input_dir, rule_id + ".yml") |
||
239 | if not os.path.exists(snippet_path): |
||
240 | continue |
||
241 | self.create_playbook( |
||
242 | snippet_path, rule_id, variables, |
||
243 | None, profile_playbooks_dir |
||
244 | ) |
||
245 | |||
246 | def build(self, profile_id=None, rule_id=None): |
||
247 | """ |
||
248 | Creates Playbooks for a specified profile. |
||
249 | If profile is not given, creates playbooks for all profiles |
||
250 | in the product. |
||
251 | If the rule_id is not given, Playbooks are created for every rule. |
||
252 | """ |
||
253 | variables = self.get_benchmark_variables() |
||
254 | profiles = {} |
||
255 | for profile_file in sorted(os.listdir(self.profiles_dir)): |
||
256 | profile_path = os.path.join(self.profiles_dir, profile_file) |
||
257 | try: |
||
258 | profile = self.open_profile(profile_path) |
||
259 | except ssg.yaml.DocumentationNotComplete as e: |
||
260 | msg = "Skipping incomplete profile {0}. To include incomplete " + \ |
||
261 | "profiles, build in debug mode.\n" |
||
262 | sys.stderr.write(msg.format(profile_path)) |
||
263 | continue |
||
264 | except RuntimeError as e: |
||
265 | sys.stderr.write(str(e)) |
||
266 | continue |
||
267 | profiles[profile.id_] = profile |
||
268 | |||
269 | if profile_id: |
||
270 | to_process = [profile_id] |
||
271 | else: |
||
272 | to_process = list(profiles.keys()) |
||
273 | |||
274 | for p in to_process: |
||
275 | profile = profiles[p] |
||
276 | if rule_id: |
||
277 | self.create_playbook_for_single_rule(profile, rule_id, |
||
278 | variables) |
||
279 | else: |
||
280 | self.create_playbooks_for_all_rules_in_profile( |
||
281 | profile, variables) |
||
282 | |||
283 | if not profile_id: |
||
284 | # build playbooks for virtual '(all)' profile |
||
285 | # this virtual profile contains all rules in the product |
||
286 | self.create_playbooks_for_all_rules(variables) |
||
287 |