Passed
Push — master ( 4c49b2...31e49c )
by Matěj
04:23 queued 11s
created

PlaybookBuilder.create_playbook()   B

Complexity

Conditions 8

Size

Total Lines 45
Code Lines 31

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 25
CRAP Score 8.0036

Importance

Changes 0
Metric Value
cc 8
eloc 31
nop 6
dl 0
loc 45
ccs 25
cts 26
cp 0.9615
crap 8.0036
rs 7.2693
c 0
b 0
f 0
1
#!/usr/bin/python3
2
3 2
import os
4 2
import re
5 2
import sys
6
7 2
from collections import OrderedDict
8
9 2
import ssg.rules
10 2
import ssg.utils
11 2
import ssg.yaml
12 2
import ssg.build_yaml
13 2
import ssg.build_remediations
14
15 2
COMMENTS_TO_PARSE = ["strategy", "complexity", "disruption"]
16
17
18 2
class PlaybookBuilder():
19 2
    def __init__(self, product_yaml_path, input_dir, output_dir, rules_dir):
20 2
        self.input_dir = input_dir
21 2
        self.output_dir = output_dir
22 2
        self.rules_dir = rules_dir
23 2
        product_yaml = ssg.yaml.open_raw(product_yaml_path)
24 2
        product_dir = os.path.dirname(product_yaml_path)
25 2
        relative_guide_dir = ssg.utils.required_key(product_yaml,
26
                                                    "benchmark_root")
27 2
        self.guide_dir = os.path.abspath(os.path.join(product_dir,
28
                                                      relative_guide_dir))
29 2
        relative_profiles_dir = ssg.utils.required_key(product_yaml,
30
                                                       "profiles_root")
31 2
        self.profiles_dir = os.path.abspath(
32
            os.path.join(product_dir, relative_profiles_dir))
33
34 2
    def get_profile_selections(self, profile):
35
        """
36
        Provides a tuple (rules, variables) where rules is a list of rules
37
        selected in the profile and variables is a dictionary of
38
        variables and their values in the profile. The method can handle
39
        profiles which extend other profiles.
40
        """
41 2
        rules = []
42 2
        variables = dict()
43 2
        if profile.extends:
44
            extended_profile_path = os.path.join(
45
                self.profiles_dir, profile.extends + ".profile")
46
            extended_profile = ssg.build_yaml.Profile.from_yaml(
47
                extended_profile_path)
48
            if not profile:
49
                sys.stderr.write(
50
                    "Could not parse profile %s.\n" % extended_profile_path)
51
                return None
52
            rules, variables = self.get_profile_selections(extended_profile)
53 2
        rules.extend(profile.get_rule_selectors())
54 2
        variables.update(profile.get_variable_selectors())
55 2
        return rules, variables
56
57 2
    def choose_variable_value(self, var_id, variables, refinements):
58
        """
59
        Determine value of variable based on profile refinements.
60
        """
61 2
        if var_id in refinements:
62 2
            selector = refinements[var_id]
63
        else:
64
            selector = "default"
65 2
        try:
66 2
            options = variables[var_id]
67
        except KeyError:
68
            raise ValueError("Variable '%s' doesn't exist." % var_id)
69 2
        try:
70 2
            value = options[selector]
71
        except KeyError:
72
            if len(options.keys()) == 1:
73
                # We will assume that if there is just one option that it could
74
                # be selected as a default option.
75
                value = list(options.values())[0]
76
            else:
77
                raise ValueError(
78
                    "Selector '%s' doesn't exist in variable '%s'. "
79
                    "Available selectors: %s." %
80
                    (selector, var_id, ", ".join(options.keys()))
81
                )
82 2
        return value
83
84 2
    def get_data_from_snippet(self, snippet_yaml, variables, refinements):
85
        """
86
        Extracts and resolves tasks and variables from Ansible snippet.
87
        """
88 2
        xccdf_var_pattern = re.compile(r"\(xccdf-var\s+(\S+)\)")
89 2
        tasks = []
90 2
        values = dict()
91 2
        for item in snippet_yaml:
92 2
            if isinstance(item, str):
93 2
                match = xccdf_var_pattern.match(item)
94 2
                if match:
95 2
                    var_id = match.group(1)
96 2
                    value = self.choose_variable_value(var_id, variables,
97
                                                       refinements)
98 2
                    values[var_id] = value
99
                else:
100
                    raise ValueError("Found unknown item '%s'" % item)
101
            else:
102 2
                tasks.append(item)
103 2
        return tasks, values
104
105 2
    def get_benchmark_variables(self):
106
        """
107
        Get all variables, their selectors and values used in a given
108
        benchmark. Returns a dictionary where keys are variable IDs and
109
        values are dictionaries where keys are selectors and values are
110
        variable values.
111
        """
112 2
        variables = dict()
113 2
        for dirpath, dirnames, filenames in os.walk(self.guide_dir):
114 2
            for filename in filenames:
115 2
                root, ext = os.path.splitext(filename)
116 2
                if ext == ".var":
117 2
                    full_path = os.path.join(dirpath, filename)
118 2
                    xccdf_value = ssg.build_yaml.Value.from_yaml(full_path)
119
                    # Make sure that selectors and values are strings
120 2
                    options = dict()
121 2
                    for k, v in xccdf_value.options.items():
122 2
                        options[str(k)] = str(v)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable str does not seem to be defined.
Loading history...
123 2
                    variables[xccdf_value.id_] = options
124 2
        return variables
125
126 2
    def _find_rule_title(self, rule_id):
127 2
        rule_path = os.path.join(self.rules_dir, rule_id + ".yml")
128 2
        rule_yaml = ssg.yaml.open_raw(rule_path)
129 2
        return rule_yaml["title"]
130
131 2
    def create_playbook(self, snippet_path, rule_id, variables,
132
                        refinements, output_dir):
133
        """
134
        Creates a Playbook from Ansible snippet for the given rule specified
135
        by rule ID, fills in the profile values and saves it into output_dir.
136
        """
137
138 2
        with open(snippet_path, "r") as snippet_file:
139 2
            snippet_str = snippet_file.read()
140 2
        fix = ssg.build_remediations.split_remediation_content_and_metadata(snippet_str)
141 2
        snippet_yaml = ssg.yaml.ordered_load(fix.contents)
142
143 2
        play_tasks, play_vars = self.get_data_from_snippet(
144
            snippet_yaml, variables, refinements
145
        )
146
147 2
        if len(play_tasks) == 0:
148
            raise ValueError(
149
                "Ansible remediation for rule '%s' in '%s' "
150
                "doesn't contain any task." %
151
                (rule_id, snippet_path)
152
            )
153
154 2
        tags = set()
155 2
        for task in play_tasks:
156 2
            tags |= set(task.pop("tags", []))
157
158 2
        play = OrderedDict()
159 2
        play["name"] = self._find_rule_title(rule_id)
160 2
        play["hosts"] = "@@HOSTS@@"
161 2
        play["become"] = True
162 2
        if len(play_vars) > 0:
163 2
            play["vars"] = play_vars
164 2
        if len(tags) > 0:
165 2
            play["tags"] = sorted(list(tags))
166 2
        play["tasks"] = play_tasks
167
168 2
        playbook = [play]
169 2
        playbook_path = os.path.join(output_dir, rule_id + ".yml")
170 2
        with open(playbook_path, "w") as playbook_file:
171
            # write remediation metadata (complexity, strategy, etc.) first
172 2
            for k, v in fix.config.items():
173 2
                playbook_file.write("# %s = %s\n" % (k, v))
174 2
            ssg.yaml.ordered_dump(
175
                playbook, playbook_file, default_flow_style=False
176
            )
177
178 2
    def open_profile(self, profile_path):
179
        """
180
        Opens and parses profile at the given profile_path.
181
        """
182 2
        if not os.path.isfile(profile_path):
183
            raise RuntimeError("'%s' is not a file!\n" % profile_path)
184 2
        profile_id, ext = os.path.splitext(os.path.basename(profile_path))
185 2
        if ext != ".profile":
186
            raise RuntimeError(
187
                "Encountered file '%s' while looking for profiles, "
188
                "extension '%s' is unknown. Skipping..\n"
189
                % (profile_path, ext)
190
            )
191
192 2
        profile = ssg.build_yaml.Profile.from_yaml(profile_path)
193 2
        if not profile:
194
            raise RuntimeError("Could not parse profile %s.\n" % profile_path)
195 2
        return profile
196
197 2
    def create_playbooks_for_all_rules(self, profile, variables):
198
        """
199
        Creates a Playbook for each rule selected in a profile from tasks
200
        extracted from snippets. Created Playbooks are parametrized by
201
        variables according to profile selection. Playbooks are written into
202
        a new subdirectory in output_dir.
203
        """
204
        profile_rules, profile_refines = self.get_profile_selections(profile)
205
206
        profile_playbooks_dir = os.path.join(self.output_dir, profile.id_)
207
        os.makedirs(profile_playbooks_dir)
208
209
        for rule_id in profile_rules:
210
            snippet_path = os.path.join(self.input_dir, rule_id + ".yml")
211
            if os.path.exists(snippet_path):
212
                self.create_playbook(
213
                    snippet_path, rule_id, variables,
214
                    profile_refines, profile_playbooks_dir
215
                )
216
217 2
    def create_playbook_for_single_rule(self, profile, rule_id, variables):
218
        """
219
        Creates a Playbook for given rule specified by a rule_id. Created
220
        Playbooks are parametrized by variables according to profile selection.
221
        Playbooks are written into a new subdirectory in output_dir.
222
        """
223 2
        profile_rules, profile_refines = self.get_profile_selections(profile)
224 2
        profile_playbooks_dir = os.path.join(self.output_dir, profile.id_)
225 2
        os.makedirs(profile_playbooks_dir)
226 2
        snippet_path = os.path.join(self.input_dir, rule_id + ".yml")
227 2
        if rule_id in profile_rules:
228 2
            self.create_playbook(
229
                snippet_path, rule_id, variables,
230
                profile_refines, profile_playbooks_dir
231
            )
232
        else:
233
            raise ValueError("Rule '%s' isn't part of profile '%s'" %
234
                             (rule_id, profile.id_))
235
236 2
    def build(self, profile_id=None, rule_id=None):
237
        """
238
        Creates Playbooks for a specified profile.
239
        If profile is not given, creates playbooks for all profiles
240
        in the product.
241
        If the rule_id is not given, Playbooks are created for every rule.
242
        """
243 2
        variables = self.get_benchmark_variables()
244 2
        if profile_id:
245 2
            profile_path = os.path.join(
246
                self.profiles_dir, profile_id + ".profile")
247 2
            profile = self.open_profile(profile_path)
248 2
            if rule_id:
249 2
                self.create_playbook_for_single_rule(profile, rule_id,
250
                                                     variables)
251
            else:
252
                self.create_playbooks_for_all_rules(profile, variables)
253
        else:
254
            # run for all profiles
255
            for profile_file in os.listdir(self.profiles_dir):
256
                profile_path = os.path.join(self.profiles_dir, profile_file)
257
                try:
258
                    profile = self.open_profile(profile_path)
259
                except RuntimeError as e:
260
                    sys.stderr.write("%s. Skipping %s." % (str(e), profile_id))
261
                    continue
262
                if rule_id:
263
                    self.create_playbook_for_single_rule(profile, rule_id,
264
                                                         variables)
265
                else:
266
                    self.create_playbooks_for_all_rules(profile, variables)
267