Issues (70)

ssg/playbook_builder.py (1 issue)

1
#!/usr/bin/python3
2
3
import os
4
import re
5
import sys
6
7
from collections import OrderedDict
8
9
import ssg.rules
10
import ssg.utils
11
from ssg.utils import mkdir_p
12
import ssg.yaml
13
import ssg.build_yaml
14
import ssg.build_remediations
15
import ssg.environment
16
import ssg.products
17
18
COMMENTS_TO_PARSE = ["strategy", "complexity", "disruption"]
19
20
21
class PlaybookBuilder():
22
    def __init__(self, product_yaml_path, input_dir, output_dir, rules_dir, profiles_dir,
23
                 build_config_yaml):
24
        self.input_dir = input_dir
25
        self.output_dir = output_dir
26
        self.rules_dir = rules_dir
27
        self.build_config_yaml = build_config_yaml
28
        self.product_yaml_path = product_yaml_path
29
        product_yaml = ssg.products.Product(product_yaml_path)
30
        product_dir = product_yaml["product_dir"]
31
        relative_guide_dir = ssg.utils.required_key(product_yaml,
32
                                                    "benchmark_root")
33
        self.guide_dir = os.path.abspath(os.path.join(product_dir,
34
                                                      relative_guide_dir))
35
        self.profiles_dir = profiles_dir
36
        additional_content_directories = product_yaml.get(
37
            "additional_content_directories", [])
38
        self.add_content_dirs = [os.path.abspath(os.path.join(product_dir, rd))
39
                                 for rd in additional_content_directories]
40
41
    def choose_variable_value(self, var_id, variables, refinements):
42
        """
43
        Determine value of variable based on profile refinements.
44
        """
45
        if refinements and var_id in refinements:
46
            selector = refinements[var_id]
47
        else:
48
            selector = "default"
49
        try:
50
            options = variables[var_id]
51
        except KeyError:
52
            raise ValueError("Variable '%s' doesn't exist." % var_id)
53
        try:
54
            value = options[selector]
55
        except KeyError:
56
            if len(options.keys()) == 1:
57
                # We will assume that if there is just one option that it could
58
                # be selected as a default option.
59
                value = list(options.values())[0]
60
            elif selector == "default":
61
                # If no 'default' selector is present in the XCCDF value,
62
                # choose the first (consistent with oscap behavior)
63
                value = list(options.values())[0]
64
            else:
65
                raise ValueError(
66
                    "Selector '%s' doesn't exist in variable '%s'. "
67
                    "Available selectors: %s." %
68
                    (selector, var_id, ", ".join(options.keys()))
69
                )
70
        return value
71
72
    def get_data_from_snippet(self, snippet_yaml, variables, refinements):
73
        """
74
        Extracts and resolves tasks and variables from Ansible snippet.
75
        """
76
        xccdf_var_pattern = re.compile(r"\(xccdf-var\s+(\S+)\)")
77
        tasks = []
78
        values = dict()
79
        for item in snippet_yaml:
80
            if isinstance(item, str):
81
                match = xccdf_var_pattern.match(item)
82
                if match:
83
                    var_id = match.group(1)
84
                    value = self.choose_variable_value(var_id, variables,
85
                                                       refinements)
86
                    values[var_id] = value
87
                else:
88
                    raise ValueError("Found unknown item '%s'" % item)
89
            else:
90
                tasks.append(item)
91
        return tasks, values
92
93
    def get_benchmark_variables(self):
94
        """
95
        Get all variables, their selectors and values used in a given
96
        benchmark. Returns a dictionary where keys are variable IDs and
97
        values are dictionaries where keys are selectors and values are
98
        variable values.
99
        """
100
        variables = dict()
101
        for cur_dir in [self.guide_dir] + self.add_content_dirs:
102
            variables.update(self._get_rules_variables(cur_dir))
103
        return variables
104
105
    def _get_rules_variables(self, base_dir):
106
        for dirpath, dirnames, filenames in os.walk(base_dir):
107
            dirnames.sort()
108
            filenames.sort()
109
            for filename in filenames:
110
                root, ext = os.path.splitext(filename)
111
                if ext == ".var":
112
                    full_path = os.path.join(dirpath, filename)
113
                    xccdf_value = ssg.build_yaml.Value.from_yaml(full_path)
114
                    # Make sure that selectors and values are strings
115
                    options = dict()
116
                    for k, v in xccdf_value.options.items():
117
                        options[str(k)] = str(v)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable str does not seem to be defined.
Loading history...
118
                    yield (xccdf_value.id_, options,)
119
120
    def _find_rule_title(self, rule_id):
121
        rule_path = os.path.join(self.rules_dir, rule_id + ".yml")
122
        rule_yaml = ssg.yaml.open_raw(rule_path)
123
        return rule_yaml["title"]
124
125
    def create_playbook(self, snippet_path, rule_id, variables,
126
                        refinements, output_dir):
127
        """
128
        Creates a Playbook from Ansible snippet for the given rule specified
129
        by rule ID, fills in the profile values and saves it into output_dir.
130
        """
131
132
        with open(snippet_path, "r") as snippet_file:
133
            snippet_str = snippet_file.read()
134
        fix = ssg.build_remediations.split_remediation_content_and_metadata(snippet_str)
135
        snippet_yaml = ssg.yaml.ordered_load(fix.contents)
136
137
        play_tasks, play_vars = self.get_data_from_snippet(
138
            snippet_yaml, variables, refinements
139
        )
140
141
        if len(play_tasks) == 0:
142
            raise ValueError(
143
                "Ansible remediation for rule '%s' in '%s' "
144
                "doesn't contain any task." %
145
                (rule_id, snippet_path)
146
            )
147
148
        tags = set()
149
        for task in play_tasks:
150
            tags |= set(task.pop("tags", []))
151
152
        play = OrderedDict()
153
        play["name"] = self._find_rule_title(rule_id)
154
        play["hosts"] = "@@HOSTS@@"
155
        play["become"] = True
156
        if len(play_vars) > 0:
157
            play["vars"] = play_vars
158
        if len(tags) > 0:
159
            play["tags"] = sorted(list(tags))
160
        play["tasks"] = play_tasks
161
162
        playbook = [play]
163
        playbook_path = os.path.join(output_dir, rule_id + ".yml")
164
        with open(playbook_path, "w") as playbook_file:
165
            # write remediation metadata (complexity, strategy, etc.) first
166
            for k, v in fix.config.items():
167
                playbook_file.write("# %s = %s\n" % (k, v))
168
            ssg.yaml.ordered_dump(
169
                playbook, playbook_file, default_flow_style=False
170
            )
171
172
    def open_profile(self, profile_path):
173
        """
174
        Opens and parses profile at the given profile_path.
175
        """
176
        if not os.path.isfile(profile_path):
177
            raise RuntimeError("'%s' is not a file!\n" % profile_path)
178
        profile_id, ext = os.path.splitext(os.path.basename(profile_path))
179
        if ext != ".profile":
180
            raise RuntimeError(
181
                "Encountered file '%s' while looking for profiles, "
182
                "extension '%s' is unknown. Skipping..\n"
183
                % (profile_path, ext)
184
            )
185
186
        env_yaml = ssg.environment.open_environment(self.build_config_yaml, self.product_yaml_path)
187
        profile = ssg.build_yaml.Profile.from_yaml(profile_path, env_yaml)
188
        if not profile:
189
            raise RuntimeError("Could not parse profile %s.\n" % profile_path)
190
        return profile
191
192
    def create_playbooks_for_all_rules_in_profile(self, profile, variables):
193
        """
194
        Creates a Playbook for each rule selected in a profile from tasks
195
        extracted from snippets. Created Playbooks are parametrized by
196
        variables according to profile selection. Playbooks are written into
197
        a new subdirectory in output_dir.
198
        """
199
        profile_rules = profile.get_rule_selectors()
200
        profile_refines = profile.get_variable_selectors()
201
202
        profile_playbooks_dir = os.path.join(self.output_dir, profile.id_)
203
        mkdir_p(profile_playbooks_dir)
204
205
        for rule_id in profile_rules:
206
            snippet_path = os.path.join(self.input_dir, rule_id + ".yml")
207
            if os.path.exists(snippet_path):
208
                self.create_playbook(
209
                    snippet_path, rule_id, variables,
210
                    profile_refines, profile_playbooks_dir
211
                )
212
213
    def create_playbook_for_single_rule(self, profile, rule_id, variables):
214
        """
215
        Creates a Playbook for given rule specified by a rule_id. Created
216
        Playbooks are parametrized by variables according to profile selection.
217
        Playbooks are written into a new subdirectory in output_dir.
218
        """
219
        profile_rules = profile.get_rule_selectors()
220
        profile_refines = profile.get_variable_selectors()
221
        profile_playbooks_dir = os.path.join(self.output_dir, profile.id_)
222
        mkdir_p(profile_playbooks_dir)
223
        snippet_path = os.path.join(self.input_dir, rule_id + ".yml")
224
        if rule_id in profile_rules:
225
            self.create_playbook(
226
                snippet_path, rule_id, variables,
227
                profile_refines, profile_playbooks_dir
228
            )
229
        else:
230
            raise ValueError("Rule '%s' isn't part of profile '%s'" %
231
                             (rule_id, profile.id_))
232
233
    def create_playbooks_for_all_rules(self, variables):
234
        profile_playbooks_dir = os.path.join(self.output_dir, "all")
235
        mkdir_p(profile_playbooks_dir)
236
        for rule in sorted(os.listdir(self.rules_dir)):
237
            rule_id, _ = os.path.splitext(rule)
238
            snippet_path = os.path.join(self.input_dir, rule_id + ".yml")
239
            if not os.path.exists(snippet_path):
240
                continue
241
            self.create_playbook(
242
                snippet_path, rule_id, variables,
243
                None, profile_playbooks_dir
244
            )
245
246
    def build(self, profile_id=None, rule_id=None):
247
        """
248
        Creates Playbooks for a specified profile.
249
        If profile is not given, creates playbooks for all profiles
250
        in the product.
251
        If the rule_id is not given, Playbooks are created for every rule.
252
        """
253
        variables = self.get_benchmark_variables()
254
        profiles = {}
255
        for profile_file in sorted(os.listdir(self.profiles_dir)):
256
            profile_path = os.path.join(self.profiles_dir, profile_file)
257
            try:
258
                profile = self.open_profile(profile_path)
259
            except ssg.yaml.DocumentationNotComplete as e:
260
                msg = "Skipping incomplete profile {0}. To include incomplete " + \
261
                    "profiles, build in debug mode.\n"
262
                sys.stderr.write(msg.format(profile_path))
263
                continue
264
            except RuntimeError as e:
265
                sys.stderr.write(str(e))
266
                continue
267
            profiles[profile.id_] = profile
268
269
        if profile_id:
270
            to_process = [profile_id]
271
        else:
272
            to_process = list(profiles.keys())
273
274
        for p in to_process:
275
            profile = profiles[p]
276
            if rule_id:
277
                self.create_playbook_for_single_rule(profile, rule_id,
278
                                                     variables)
279
            else:
280
                self.create_playbooks_for_all_rules_in_profile(
281
                    profile, variables)
282
283
        if not profile_id:
284
            # build playbooks for virtual '(all)' profile
285
            # this virtual profile contains all rules in the product
286
            self.create_playbooks_for_all_rules(variables)
287