Passed
Push — master ( e9a66b...7d6652 )
by Jan
02:39 queued 12s
created

ssg.playbook_builder   B

Complexity

Total Complexity 48

Size/Duplication

Total Lines 278
Duplicated Lines 0 %

Test Coverage

Coverage 74.85%

Importance

Changes 0
Metric Value
eloc 187
dl 0
loc 278
ccs 122
cts 163
cp 0.7485
rs 8.5599
c 0
b 0
f 0
wmc 48

12 Methods

Rating   Name   Duplication   Size   Complexity  
A PlaybookBuilder.__init__() 0 13 1
B PlaybookBuilder.choose_variable_value() 0 30 7
A PlaybookBuilder.get_data_from_snippet() 0 20 4
A PlaybookBuilder.get_benchmark_variables() 0 11 2
A PlaybookBuilder.create_playbooks_for_all_rules_in_profile() 0 19 3
A PlaybookBuilder._find_rule_title() 0 4 1
A PlaybookBuilder._get_rules_variables() 0 14 5
A PlaybookBuilder.create_playbooks_for_all_rules() 0 11 3
B PlaybookBuilder.create_playbook() 0 45 8
B PlaybookBuilder.build() 0 41 8
A PlaybookBuilder.create_playbook_for_single_rule() 0 19 2
A PlaybookBuilder.open_profile() 0 18 4

How to fix   Complexity   

Complexity

Complex classes like ssg.playbook_builder often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/python3
2
3 2
import os
4 2
import re
5 2
import sys
6
7 2
from collections import OrderedDict
8
9 2
import ssg.rules
10 2
import ssg.utils
11 2
import ssg.yaml
12 2
import ssg.build_yaml
13 2
import ssg.build_remediations
14
15 2
COMMENTS_TO_PARSE = ["strategy", "complexity", "disruption"]
16
17
18 2
class PlaybookBuilder():
19 2
    def __init__(self, product_yaml_path, input_dir, output_dir, rules_dir, profiles_dir):
20 2
        self.input_dir = input_dir
21 2
        self.output_dir = output_dir
22 2
        self.rules_dir = rules_dir
23 2
        product_yaml = ssg.yaml.open_raw(product_yaml_path)
24 2
        product_dir = os.path.dirname(product_yaml_path)
25 2
        relative_guide_dir = ssg.utils.required_key(product_yaml,
26
                                                    "benchmark_root")
27 2
        self.guide_dir = os.path.abspath(os.path.join(product_dir,
28
                                                      relative_guide_dir))
29 2
        self.profiles_dir = profiles_dir
30 2
        additional_content_directories = product_yaml.get("additional_content_directories", [])
31 2
        self.add_content_dirs = [os.path.abspath(os.path.join(product_dir, rd)) for rd in additional_content_directories]
32
33 2
    def choose_variable_value(self, var_id, variables, refinements):
34
        """
35
        Determine value of variable based on profile refinements.
36
        """
37 2
        if refinements and var_id in refinements:
38 2
            selector = refinements[var_id]
39
        else:
40
            selector = "default"
41 2
        try:
42 2
            options = variables[var_id]
43
        except KeyError:
44
            raise ValueError("Variable '%s' doesn't exist." % var_id)
45 2
        try:
46 2
            value = options[selector]
47
        except KeyError:
48
            if len(options.keys()) == 1:
49
                # We will assume that if there is just one option that it could
50
                # be selected as a default option.
51
                value = list(options.values())[0]
52
            elif selector == "default":
53
                # If no 'default' selector is present in the XCCDF value,
54
                # choose the first (consistent with oscap behavior)
55
                value = list(options.values())[0]
56
            else:
57
                raise ValueError(
58
                    "Selector '%s' doesn't exist in variable '%s'. "
59
                    "Available selectors: %s." %
60
                    (selector, var_id, ", ".join(options.keys()))
61
                )
62 2
        return value
63
64 2
    def get_data_from_snippet(self, snippet_yaml, variables, refinements):
65
        """
66
        Extracts and resolves tasks and variables from Ansible snippet.
67
        """
68 2
        xccdf_var_pattern = re.compile(r"\(xccdf-var\s+(\S+)\)")
69 2
        tasks = []
70 2
        values = dict()
71 2
        for item in snippet_yaml:
72 2
            if isinstance(item, str):
73 2
                match = xccdf_var_pattern.match(item)
74 2
                if match:
75 2
                    var_id = match.group(1)
76 2
                    value = self.choose_variable_value(var_id, variables,
77
                                                       refinements)
78 2
                    values[var_id] = value
79
                else:
80
                    raise ValueError("Found unknown item '%s'" % item)
81
            else:
82 2
                tasks.append(item)
83 2
        return tasks, values
84
85 2
    def get_benchmark_variables(self):
86
        """
87
        Get all variables, their selectors and values used in a given
88
        benchmark. Returns a dictionary where keys are variable IDs and
89
        values are dictionaries where keys are selectors and values are
90
        variable values.
91
        """
92 2
        variables = dict()
93 2
        for cur_dir in [self.guide_dir] + self.add_content_dirs:
94 2
            variables.update(self._get_rules_variables(cur_dir))
95 2
        return variables
96
97 2
    def _get_rules_variables(self, base_dir):
98 2
        for dirpath, dirnames, filenames in os.walk(base_dir):
99 2
            dirnames.sort()
100 2
            filenames.sort()
101 2
            for filename in filenames:
102 2
                root, ext = os.path.splitext(filename)
103 2
                if ext == ".var":
104 2
                    full_path = os.path.join(dirpath, filename)
105 2
                    xccdf_value = ssg.build_yaml.Value.from_yaml(full_path)
106
                    # Make sure that selectors and values are strings
107 2
                    options = dict()
108 2
                    for k, v in xccdf_value.options.items():
109 2
                        options[str(k)] = str(v)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable str does not seem to be defined.
Loading history...
110 2
                    yield (xccdf_value.id_, options,)
111
112 2
    def _find_rule_title(self, rule_id):
113 2
        rule_path = os.path.join(self.rules_dir, rule_id + ".yml")
114 2
        rule_yaml = ssg.yaml.open_raw(rule_path)
115 2
        return rule_yaml["title"]
116
117 2
    def create_playbook(self, snippet_path, rule_id, variables,
118
                        refinements, output_dir):
119
        """
120
        Creates a Playbook from Ansible snippet for the given rule specified
121
        by rule ID, fills in the profile values and saves it into output_dir.
122
        """
123
124 2
        with open(snippet_path, "r") as snippet_file:
125 2
            snippet_str = snippet_file.read()
126 2
        fix = ssg.build_remediations.split_remediation_content_and_metadata(snippet_str)
127 2
        snippet_yaml = ssg.yaml.ordered_load(fix.contents)
128
129 2
        play_tasks, play_vars = self.get_data_from_snippet(
130
            snippet_yaml, variables, refinements
131
        )
132
133 2
        if len(play_tasks) == 0:
134
            raise ValueError(
135
                "Ansible remediation for rule '%s' in '%s' "
136
                "doesn't contain any task." %
137
                (rule_id, snippet_path)
138
            )
139
140 2
        tags = set()
141 2
        for task in play_tasks:
142 2
            tags |= set(task.pop("tags", []))
143
144 2
        play = OrderedDict()
145 2
        play["name"] = self._find_rule_title(rule_id)
146 2
        play["hosts"] = "@@HOSTS@@"
147 2
        play["become"] = True
148 2
        if len(play_vars) > 0:
149 2
            play["vars"] = play_vars
150 2
        if len(tags) > 0:
151 2
            play["tags"] = sorted(list(tags))
152 2
        play["tasks"] = play_tasks
153
154 2
        playbook = [play]
155 2
        playbook_path = os.path.join(output_dir, rule_id + ".yml")
156 2
        with open(playbook_path, "w") as playbook_file:
157
            # write remediation metadata (complexity, strategy, etc.) first
158 2
            for k, v in fix.config.items():
159 2
                playbook_file.write("# %s = %s\n" % (k, v))
160 2
            ssg.yaml.ordered_dump(
161
                playbook, playbook_file, default_flow_style=False
162
            )
163
164 2
    def open_profile(self, profile_path):
165
        """
166
        Opens and parses profile at the given profile_path.
167
        """
168 2
        if not os.path.isfile(profile_path):
169
            raise RuntimeError("'%s' is not a file!\n" % profile_path)
170 2
        profile_id, ext = os.path.splitext(os.path.basename(profile_path))
171 2
        if ext != ".profile":
172
            raise RuntimeError(
173
                "Encountered file '%s' while looking for profiles, "
174
                "extension '%s' is unknown. Skipping..\n"
175
                % (profile_path, ext)
176
            )
177
178 2
        profile = ssg.build_yaml.Profile.from_yaml(profile_path)
179 2
        if not profile:
180
            raise RuntimeError("Could not parse profile %s.\n" % profile_path)
181 2
        return profile
182
183 2
    def create_playbooks_for_all_rules_in_profile(self, profile, variables):
184
        """
185
        Creates a Playbook for each rule selected in a profile from tasks
186
        extracted from snippets. Created Playbooks are parametrized by
187
        variables according to profile selection. Playbooks are written into
188
        a new subdirectory in output_dir.
189
        """
190
        profile_rules = profile.get_rule_selectors()
191
        profile_refines = profile.get_variable_selectors()
192
193
        profile_playbooks_dir = os.path.join(self.output_dir, profile.id_)
194
        os.makedirs(profile_playbooks_dir)
195
196
        for rule_id in profile_rules:
197
            snippet_path = os.path.join(self.input_dir, rule_id + ".yml")
198
            if os.path.exists(snippet_path):
199
                self.create_playbook(
200
                    snippet_path, rule_id, variables,
201
                    profile_refines, profile_playbooks_dir
202
                )
203
204 2
    def create_playbook_for_single_rule(self, profile, rule_id, variables):
205
        """
206
        Creates a Playbook for given rule specified by a rule_id. Created
207
        Playbooks are parametrized by variables according to profile selection.
208
        Playbooks are written into a new subdirectory in output_dir.
209
        """
210 2
        profile_rules = profile.get_rule_selectors()
211 2
        profile_refines = profile.get_variable_selectors()
212 2
        profile_playbooks_dir = os.path.join(self.output_dir, profile.id_)
213 2
        os.makedirs(profile_playbooks_dir)
214 2
        snippet_path = os.path.join(self.input_dir, rule_id + ".yml")
215 2
        if rule_id in profile_rules:
216 2
            self.create_playbook(
217
                snippet_path, rule_id, variables,
218
                profile_refines, profile_playbooks_dir
219
            )
220
        else:
221
            raise ValueError("Rule '%s' isn't part of profile '%s'" %
222
                             (rule_id, profile.id_))
223
224 2
    def create_playbooks_for_all_rules(self, variables):
225
        profile_playbooks_dir = os.path.join(self.output_dir, "all")
226
        os.makedirs(profile_playbooks_dir)
227
        for rule in sorted(os.listdir(self.rules_dir)):
228
            rule_id, _ = os.path.splitext(rule)
229
            snippet_path = os.path.join(self.input_dir, rule_id + ".yml")
230
            if not os.path.exists(snippet_path):
231
                continue
232
            self.create_playbook(
233
                snippet_path, rule_id, variables,
234
                None, profile_playbooks_dir
235
            )
236
237 2
    def build(self, profile_id=None, rule_id=None):
238
        """
239
        Creates Playbooks for a specified profile.
240
        If profile is not given, creates playbooks for all profiles
241
        in the product.
242
        If the rule_id is not given, Playbooks are created for every rule.
243
        """
244 2
        variables = self.get_benchmark_variables()
245 2
        profiles = {}
246 2
        for profile_file in sorted(os.listdir(self.profiles_dir)):
247 2
            profile_path = os.path.join(self.profiles_dir, profile_file)
248 2
            try:
249 2
                profile = self.open_profile(profile_path)
250
            except ssg.yaml.DocumentationNotComplete as e:
251
                msg = "Skipping incomplete profile {0}. To include incomplete " + \
252
                    "profiles, build in debug mode.\n"
253
                sys.stderr.write(msg.format(profile_path))
254
                continue
255
            except RuntimeError as e:
256
                sys.stderr.write(str(e))
257
                continue
258 2
            profiles[profile.id_] = profile
259
260 2
        if profile_id:
261 2
            to_process = [profile_id]
262
        else:
263
            to_process = list(profiles.keys())
264
265 2
        for p in to_process:
266 2
            profile = profiles[p]
267 2
            if rule_id:
268 2
                self.create_playbook_for_single_rule(profile, rule_id,
269
                                                     variables)
270
            else:
271
                self.create_playbooks_for_all_rules_in_profile(
272
                    profile, variables)
273
274 2
        if not profile_id:
275
            # build playbooks for virtual '(all)' profile
276
            # this virtual profile contains all rules in the product
277
            self.create_playbooks_for_all_rules(variables)
278