Test Failed
Push — master ( 08436c...bdc824 )
by Jan
01:36 queued 21s
created

ssg.controls.Policy.load()   C

Complexity

Conditions 10

Size

Total Lines 33
Code Lines 29

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 100.0029

Importance

Changes 0
Metric Value
cc 10
eloc 29
nop 1
dl 0
loc 33
ccs 1
cts 29
cp 0.0345
crap 100.0029
rs 5.9999
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like ssg.controls.Policy.load() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 2
import collections
2 2
import logging
3 2
import os
4 2
import copy
5 2
from glob import glob
6
7 2
import ssg.build_yaml
8 2
import ssg.yaml
9 2
import ssg.utils
10
11
12 2
class InvalidStatus(Exception):
13 2
    pass
14
15
16 2
class Status:
17 2
    PENDING = "pending"
18 2
    PLANNED = "planned"
19 2
    NOT_APPLICABLE = "not applicable"
20 2
    INHERENTLY_MET = "inherently met"
21 2
    DOCUMENTATION = "documentation"
22 2
    PARTIAL = "partial"
23 2
    SUPPORTED = "supported"
24 2
    AUTOMATED = "automated"
25 2
    MANUAL = "manual"
26 2
    DOES_NOT_MEET = "does not meet"
27
28 2
    def __init__(self, status):
29
        self.status = status
30
31 2
    @classmethod
32
    def from_control_info(cls, ctrl, status):
33
        if status is None:
34
            return cls.PENDING
35
36
        valid_statuses = [
37
            cls.PENDING,
38
            cls.PLANNED,
39
            cls.NOT_APPLICABLE,
40
            cls.INHERENTLY_MET,
41
            cls.DOCUMENTATION,
42
            cls.PARTIAL,
43
            cls.SUPPORTED,
44
            cls.AUTOMATED,
45
            cls.MANUAL,
46
            cls.DOES_NOT_MEET
47
        ]
48
49
        if status not in valid_statuses:
50
            raise InvalidStatus(
51
                    "The given status '{given}' in the control '{control}' "
52
                    "was invalid. Please use one of "
53
                    "the following: {valid}".format(given=status,
54
                                                    control=ctrl,
55
                                                    valid=valid_statuses))
56
        return status
57
58 2
    def __str__(self):
59
        return self.status
60
61 2
    def __eq__(self, other):
62
        if isinstance(other, Status):
63
            return self.status == other.status
64
        elif isinstance(other, str):
65
            return self.status == other
66
        return False
67
68
69 2
class Control(ssg.build_yaml.SelectionHandler):
70 2
    def __init__(self):
71
        super(Control, self).__init__()
72
        self.id = None
73
        self.levels = []
74
        self.notes = ""
75
        self.title = ""
76
        self.description = ""
77
        self.rationale = ""
78
        self.automated = ""
79
        self.status = None
80
        self.mitigation = ""
81
        self.artifact_description = ""
82
        self.status_justification = ""
83
        self.fixtext = ""
84
        self.check = ""
85
86 2
    def __hash__(self):
87
        """ Controls are meant to be unique, so using the
88
        ID should suffice"""
89
        return hash(self.id)
90
91 2
    @classmethod
92 2
    def from_control_dict(cls, control_dict, env_yaml=None, default_level=["default"]):
93
        control = cls()
94
        control.id = ssg.utils.required_key(control_dict, "id")
95
        control.title = control_dict.get("title")
96
        control.description = control_dict.get("description")
97
        control.rationale = control_dict.get("rationale")
98
        control.status = Status.from_control_info(control.id, control_dict.get("status", None))
99
        control.automated = control_dict.get("automated", "no")
100
        control.status_justification = control_dict.get('status_justification')
101
        control.artifact_description = control_dict.get('artifact_description')
102
        control.mitigation = control_dict.get('mitigation')
103
        control.fixtext = control_dict.get('fixtext')
104
        control.check = control_dict.get('check')
105
        if control.status == "automated":
106
            control.automated = "yes"
107
        if control.automated not in ["yes", "no", "partially"]:
108
            msg = (
109
                "Invalid value '%s' of automated key in control "
110
                "%s '%s'. Can be only 'yes', 'no', 'partially'."
111
                % (control.automated,  control.id, control.title))
112
            raise ValueError(msg)
113
        control.levels = control_dict.get("levels", default_level)
114
        control.notes = control_dict.get("notes", "")
115
        selections = control_dict.get("rules", {})
116
117
        product = None
118
        product_dir = None
119
        benchmark_root = None
120
        if env_yaml:
121
            product = env_yaml.get('product', None)
122
            product_dir = env_yaml.get('product_dir', None)
123
            benchmark_root = env_yaml.get('benchmark_root', None)
124
            content_dir = os.path.join(product_dir, benchmark_root)
125
126
        control.selections = selections
127
128
        control.related_rules = control_dict.get("related_rules", [])
129
        control.note = control_dict.get("note")
130
        return control
131
132
133 2
class Level():
134 2
    def __init__(self):
135
        self.id = None
136
        self.inherits_from = None
137
138 2
    @classmethod
139
    def from_level_dict(cls, level_dict):
140
        level = cls()
141
        level.id = ssg.utils.required_key(level_dict, "id")
142
        level.inherits_from = level_dict.get("inherits_from")
143
        return level
144
145 2
class Policy():
146 2
    def __init__(self, filepath, env_yaml=None):
147
        self.id = None
148
        self.env_yaml = env_yaml
149
        self.filepath = filepath
150
        self.controls_dir = os.path.splitext(filepath)[0]
151
        self.controls = []
152
        self.controls_by_id = dict()
153
        self.levels = []
154
        self.levels_by_id = dict()
155
        self.title = ""
156
        self.source = ""
157
158 2
    def _parse_controls_tree(self, tree):
159
        default_level = ["default"]
160
        if self.levels:
161
            default_level = [self.levels[0].id]
162
163
        for node in tree:
164
            try:
165
                control = Control.from_control_dict(
166
                    node, self.env_yaml, default_level=default_level)
167
            except Exception as exc:
168
                msg = (
169
                    "Unable to parse controls from {filename}: {error}"
170
                    .format(filename=self.filepath, error=str(exc)))
171
                raise RuntimeError(msg)
172
            if "controls" in node:
173
                for sc in self._parse_controls_tree(node["controls"]):
174
                    yield sc
175
                    control.update_with(sc)
176
            yield control
177
178 2
    def load(self):
179
        yaml_contents = ssg.yaml.open_and_expand(self.filepath, self.env_yaml)
180
        controls_dir = yaml_contents.get("controls_dir")
181
        if controls_dir:
182
            self.controls_dir = os.path.join(os.path.dirname(self.filepath), controls_dir)
183
        self.id = ssg.utils.required_key(yaml_contents, "id")
184
        self.title = ssg.utils.required_key(yaml_contents, "title")
185
        self.source = yaml_contents.get("source", "")
186
187
        level_list = yaml_contents.get("levels", [])
188
        for lv in level_list:
189
            level = Level.from_level_dict(lv)
190
            self.levels.append(level)
191
            self.levels_by_id[level.id] = level
192
193
        if os.path.exists(self.controls_dir) and os.path.isdir(self.controls_dir):
194
            controls_tree = yaml_contents.get("controls", list())
195
            files = os.listdir(self.controls_dir)
196
            for file in files:
197
                if file.endswith('.yml'):
198
                    full_path = os.path.join(self.controls_dir, file)
199
                    yaml_contents = ssg.yaml.open_and_expand(full_path, self.env_yaml)
200
                    for control in yaml_contents['controls']:
201
                        controls_tree.append(control)
202
                elif file.startswith('.'):
203
                    continue
204
                else:
205
                    raise RuntimeError("Found non yaml file in %s" % self.controls_dir)
206
        else:
207
            controls_tree = ssg.utils.required_key(yaml_contents, "controls")
208
        for c in self._parse_controls_tree(controls_tree):
209
            self.controls.append(c)
210
            self.controls_by_id[c.id] = c
211
212 2
    def get_control(self, control_id):
213
        try:
214
            c = self.controls_by_id[control_id]
215
            return c
216
        except KeyError:
217
            msg = "%s not found in policy %s" % (
218
                control_id, self.id
219
            )
220
            raise ValueError(msg)
221
222 2
    def get_level(self, level_id):
223
        try:
224
            lv = self.levels_by_id[level_id]
225
            return lv
226
        except KeyError:
227
            msg = "Level %s not found in policy %s" % (
228
                level_id, self.id
229
            )
230
            raise ValueError(msg)
231
232 2
    def get_level_with_ancestors_sequence(self, level_id):
233
        # use OrderedDict for Python2 compatibility instead of ordered set
234
        levels = collections.OrderedDict()
235
        level = self.get_level(level_id)
236
        levels[level] = ""
237
        if level.inherits_from:
238
            for lv in level.inherits_from:
239
                eligible_levels = [l for l in self.get_level_with_ancestors_sequence(lv) if l not in levels.keys()]
240
                for l in eligible_levels:
241
                    levels[l] = ""
242
        return list(levels.keys())
243
244
245 2
class ControlsManager():
246 2
    def __init__(self, controls_dir, env_yaml=None):
247
        self.controls_dir = os.path.abspath(controls_dir)
248
        self.env_yaml = env_yaml
249
        self.policies = {}
250
251 2
    def load(self):
252
        if not os.path.exists(self.controls_dir):
253
            return
254
        for filename in sorted(glob(os.path.join(self.controls_dir, "*.yml"))):
255
            logging.info("Found file %s" % (filename))
256
            filepath = os.path.join(self.controls_dir, filename)
257
            policy = Policy(filepath, self.env_yaml)
258
            policy.load()
259
            self.policies[policy.id] = policy
260
261 2
    def get_control(self, policy_id, control_id):
262
        try:
263
            policy = self.policies[policy_id]
264
        except KeyError:
265
            msg = "policy '%s' doesn't exist" % (policy_id)
266
            raise ValueError(msg)
267
        control = policy.get_control(control_id)
268
        return control
269
270 2
    def _get_policy(self, policy_id):
271
        try:
272
            policy = self.policies[policy_id]
273
        except KeyError:
274
            msg = "policy '%s' doesn't exist" % (policy_id)
275
            raise ValueError(msg)
276
        return policy
277
278 2
    def get_all_controls_of_level(self, policy_id, level_id):
279
        policy = self._get_policy(policy_id)
280
        levels = policy.get_level_with_ancestors_sequence(level_id)
281
        all_policy_controls = self.get_all_controls(policy_id)
282
        eligible_controls = []
283
        already_defined_variables = set()
284
        # we will go level by level, from top to bottom
285
        # this is done to enable overriding of variables by higher levels
286
        for lv in levels:
287
            for control in all_policy_controls:
288
                if lv.id not in control.levels:
289
                    continue
290
291
                variables = set(control.variables.keys())
292
293
                variables_to_remove = variables.intersection(already_defined_variables)
294
                already_defined_variables.update(variables)
295
296
                new_c = self._get_control_without_variables(variables_to_remove, control)
297
                eligible_controls.append(new_c)
298
299
        return eligible_controls
300
301 2
    @staticmethod
302
    def _get_control_without_variables(variables_to_remove, control):
303
        if not variables_to_remove:
304
            return control
305
306
        new_c = copy.deepcopy(control)
307
        for var in variables_to_remove:
308
            del new_c.variables[var]
309
        return new_c
310
311 2
    def get_all_controls(self, policy_id):
312
        policy = self._get_policy(policy_id)
313
        return policy.controls_by_id.values()
314