Test Failed
Pull Request — master (#7716)
by Matěj
02:51
created

ssg.controls   B

Complexity

Total Complexity 50

Size/Duplication

Total Lines 279
Duplicated Lines 0 %

Test Coverage

Coverage 22.55%

Importance

Changes 0
Metric Value
eloc 227
dl 0
loc 279
ccs 46
cts 204
cp 0.2255
rs 8.4
c 0
b 0
f 0
wmc 50

22 Methods

Rating   Name   Duplication   Size   Complexity  
A ControlsManager._get_control_without_variables() 0 9 3
A Level.__init__() 0 3 1
A ControlsManager._get_policy() 0 7 2
A Policy.get_level_with_ancestors_sequence() 0 11 4
A Status.__init__() 0 2 1
A Control.__hash__() 0 4 1
A Level.from_level_dict() 0 6 1
A Status.__str__() 0 2 1
A Status.from_control_info() 0 24 3
A ControlsManager.load() 0 9 3
A ControlsManager.get_control() 0 8 2
A Policy.__init__() 0 10 1
B Policy._parse_controls_tree() 0 19 6
A Status.__eq__() 0 6 3
A Control.from_control_dict() 0 34 4
A ControlsManager.get_all_controls_of_level() 0 22 4
A Control.__init__() 0 9 1
A Policy.get_level() 0 9 2
A ControlsManager.__init__() 0 4 1
A ControlsManager.get_all_controls() 0 3 1
A Policy.get_control() 0 9 2
A Policy.load() 0 16 3

How to fix   Complexity   

Complexity

Complex classes like ssg.controls often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 2
import collections
2 2
import logging
3 2
import os
4 2
import copy
5 2
from glob import glob
6
7 2
import ssg.build_yaml
8 2
import ssg.yaml
9 2
import ssg.utils
10
11
12 2
class InvalidStatus(Exception):
13 2
    pass
14
15 2
class Status():
16 2
    PENDING = "pending"
17 2
    PLANNED = "planned"
18 2
    NOT_APPLICABLE = "not applicable"
19 2
    INHERENTLY_MET = "inherently met"
20 2
    DOCUMENTATION = "documentation"
21 2
    PARTIAL = "partial"
22 2
    SUPPORTED = "supported"
23 2
    AUTOMATED = "automated"
24
25 2
    def __init__(self, status):
26
        self.status = status
27
28 2
    @classmethod
29
    def from_control_info(cls, ctrl, status):
30
        if status is None:
31
            return cls.PENDING
32
33
        valid_statuses = [
34
            cls.PENDING,
35
            cls.PLANNED,
36
            cls.NOT_APPLICABLE,
37
            cls.INHERENTLY_MET,
38
            cls.DOCUMENTATION,
39
            cls.PARTIAL,
40
            cls.SUPPORTED,
41
            cls.AUTOMATED,
42
        ]
43
44
        if status not in valid_statuses:
45
            raise InvalidStatus(
46
                    "The given status '{given}' in the control '{control}' "
47
                    "was invalid. Please use one of "
48
                    "the following: {valid}".format(given=status,
49
                                                    control=ctrl,
50
                                                    valid=valid_statuses))
51
        return status
52
53 2
    def __str__(self):
54
        return self.status
55
56 2
    def __eq__(self, other):
57
        if isinstance(other, Status):
58
            return self.status == other.status
59
        elif isinstance(other, str):
60
            return self.status == other
61
        return False
62
63
64 2
class Control(ssg.build_yaml.SelectionHandler):
65 2
    def __init__(self):
66
        super(Control, self).__init__()
67
        self.id = None
68
        self.levels = []
69
        self.notes = ""
70
        self.title = ""
71
        self.description = ""
72
        self.automated = ""
73
        self.status = None
74
75 2
    def __hash__(self):
76
        """ Controls are meant to be unique, so using the
77
        ID should suffice"""
78
        return hash(self.id)
79
80 2
    @classmethod
81 2
    def from_control_dict(cls, control_dict, env_yaml=None, default_level=["default"]):
82
        control = cls()
83
        control.id = ssg.utils.required_key(control_dict, "id")
84
        control.title = control_dict.get("title")
85
        control.description = control_dict.get("description")
86
        control.status = Status.from_control_info(control.id, control_dict.get("status", None))
87
        control.automated = control_dict.get("automated", "no")
88
        if control.status == "automated":
89
            control.automated = "yes"
90
        if control.automated not in ["yes", "no", "partially"]:
91
            msg = (
92
                "Invalid value '%s' of automated key in control "
93
                "%s '%s'. Can be only 'yes', 'no', 'partially'."
94
                % (control.automated,  control.id, control.title))
95
            raise ValueError(msg)
96
        control.levels = control_dict.get("levels", default_level)
97
        control.notes = control_dict.get("notes", "")
98
        selections = control_dict.get("rules", {})
99
100
        product = None
101
        product_dir = None
102
        benchmark_root = None
103
        if env_yaml:
104
            product = env_yaml.get('product', None)
105
            product_dir = env_yaml.get('product_dir', None)
106
            benchmark_root = env_yaml.get('benchmark_root', None)
107
            content_dir = os.path.join(product_dir, benchmark_root)
108
109
        control.selections = selections
110
111
        control.related_rules = control_dict.get("related_rules", [])
112
        control.note = control_dict.get("note")
113
        return control
114
115
116 2
class Level():
117 2
    def __init__(self):
118
        self.id = None
119
        self.inherits_from = None
120
121 2
    @classmethod
122
    def from_level_dict(cls, level_dict):
123
        level = cls()
124
        level.id = ssg.utils.required_key(level_dict, "id")
125
        level.inherits_from = level_dict.get("inherits_from")
126
        return level
127
128 2
class Policy():
129 2
    def __init__(self, filepath, env_yaml=None):
130
        self.id = None
131
        self.env_yaml = env_yaml
132
        self.filepath = filepath
133
        self.controls = []
134
        self.controls_by_id = dict()
135
        self.levels = []
136
        self.levels_by_id = dict()
137
        self.title = ""
138
        self.source = ""
139
140 2
    def _parse_controls_tree(self, tree):
141
        default_level = ["default"]
142
        if self.levels:
143
            default_level = [self.levels[0].id]
144
145
        for node in tree:
146
            try:
147
                control = Control.from_control_dict(
148
                    node, self.env_yaml, default_level=default_level)
149
            except Exception as exc:
150
                msg = (
151
                    "Unable to parse controls from {filename}: {error}"
152
                    .format(filename=self.filepath, error=str(exc)))
153
                raise RuntimeError(msg)
154
            if "controls" in node:
155
                for sc in self._parse_controls_tree(node["controls"]):
156
                    yield sc
157
                    control.update_with(sc)
158
            yield control
159
160 2
    def load(self):
161
        yaml_contents = ssg.yaml.open_and_expand(self.filepath, self.env_yaml)
162
        self.id = ssg.utils.required_key(yaml_contents, "id")
163
        self.title = ssg.utils.required_key(yaml_contents, "title")
164
        self.source = yaml_contents.get("source", "")
165
166
        level_list = yaml_contents.get("levels", [])
167
        for lv in level_list:
168
            level = Level.from_level_dict(lv)
169
            self.levels.append(level)
170
            self.levels_by_id[level.id] = level
171
172
        controls_tree = ssg.utils.required_key(yaml_contents, "controls")
173
        for c in self._parse_controls_tree(controls_tree):
174
            self.controls.append(c)
175
            self.controls_by_id[c.id] = c
176
177 2
    def get_control(self, control_id):
178
        try:
179
            c = self.controls_by_id[control_id]
180
            return c
181
        except KeyError:
182
            msg = "%s not found in policy %s" % (
183
                control_id, self.id
184
            )
185
            raise ValueError(msg)
186
187 2
    def get_level(self, level_id):
188
        try:
189
            lv = self.levels_by_id[level_id]
190
            return lv
191
        except KeyError:
192
            msg = "Level %s not found in policy %s" % (
193
                level_id, self.id
194
            )
195
            raise ValueError(msg)
196
197 2
    def get_level_with_ancestors_sequence(self, level_id):
198
        # use OrderedDict for Python2 compatibility instead of ordered set
199
        levels = collections.OrderedDict()
200
        level = self.get_level(level_id)
201
        levels[level] = ""
202
        if level.inherits_from:
203
            for lv in level.inherits_from:
204
                eligible_levels = [l for l in self.get_level_with_ancestors_sequence(lv) if l not in levels.keys()]
205
                for l in eligible_levels:
206
                    levels[l] = ""
207
        return list(levels.keys())
208
209
210 2
class ControlsManager():
211 2
    def __init__(self, controls_dir, env_yaml=None):
212
        self.controls_dir = os.path.abspath(controls_dir)
213
        self.env_yaml = env_yaml
214
        self.policies = {}
215
216 2
    def load(self):
217
        if not os.path.exists(self.controls_dir):
218
            return
219
        for filename in sorted(glob(os.path.join(self.controls_dir, "*.yml"))):
220
            logging.info("Found file %s" % (filename))
221
            filepath = os.path.join(self.controls_dir, filename)
222
            policy = Policy(filepath, self.env_yaml)
223
            policy.load()
224
            self.policies[policy.id] = policy
225
226 2
    def get_control(self, policy_id, control_id):
227
        try:
228
            policy = self.policies[policy_id]
229
        except KeyError:
230
            msg = "policy '%s' doesn't exist" % (policy_id)
231
            raise ValueError(msg)
232
        control = policy.get_control(control_id)
233
        return control
234
235 2
    def _get_policy(self, policy_id):
236
        try:
237
            policy = self.policies[policy_id]
238
        except KeyError:
239
            msg = "policy '%s' doesn't exist" % (policy_id)
240
            raise ValueError(msg)
241
        return policy
242
243 2
    def get_all_controls_of_level(self, policy_id, level_id):
244
        policy = self._get_policy(policy_id)
245
        levels = policy.get_level_with_ancestors_sequence(level_id)
246
        all_policy_controls = self.get_all_controls(policy_id)
247
        eligible_controls = []
248
        already_defined_variables = set()
249
        # we will go level by level, from top to bottom
250
        # this is done to enable overriding of variables by higher levels
251
        for lv in levels:
252
            for control in all_policy_controls:
253
                if lv.id not in control.levels:
254
                    continue
255
256
                variables = set(control.variables.keys())
257
258
                variables_to_remove = variables.intersection(already_defined_variables)
259
                already_defined_variables.update(variables)
260
261
                new_c = self._get_control_without_variables(variables_to_remove, control)
262
                eligible_controls.append(new_c)
263
264
        return eligible_controls
265
266 2
    @staticmethod
267
    def _get_control_without_variables(variables_to_remove, control):
268
        if not variables_to_remove:
269
            return control
270
271
        new_c = copy.deepcopy(control)
272
        for var in variables_to_remove:
273
            del new_c.variables[var]
274
        return new_c
275
276 2
    def get_all_controls(self, policy_id):
277
        policy = self._get_policy(policy_id)
278
        return policy.controls_by_id.values()
279