Test Failed
Pull Request — master (#7775)
by Matthew
02:20
created

ssg.controls.Status.__str__()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 2
ccs 1
cts 2
cp 0.5
crap 1.125
rs 10
c 0
b 0
f 0
1 2
import collections
2 2
import logging
3 2
import os
4 2
import copy
5 2
from glob import glob
6
7 2
import ssg.build_yaml
8 2
import ssg.yaml
9 2
import ssg.utils
10
11
12 2
class InvalidStatus(Exception):
13 2
    pass
14
15
16 2
class Status:
17 2
    PENDING = "pending"
18 2
    PLANNED = "planned"
19 2
    NOT_APPLICABLE = "not applicable"
20 2
    INHERENTLY_MET = "inherently met"
21 2
    DOCUMENTATION = "documentation"
22 2
    PARTIAL = "partial"
23 2
    SUPPORTED = "supported"
24 2
    AUTOMATED = "automated"
25 2
    DOES_NOT_MEET = "does not meet"
26
27 2
    def __init__(self, status):
28
        self.status = status
29
30 2
    @classmethod
31
    def from_control_info(cls, ctrl, status):
32
        if status is None:
33
            return cls.PENDING
34
35
        valid_statuses = [
36
            cls.PENDING,
37
            cls.PLANNED,
38
            cls.NOT_APPLICABLE,
39
            cls.INHERENTLY_MET,
40
            cls.DOCUMENTATION,
41
            cls.PARTIAL,
42
            cls.SUPPORTED,
43
            cls.AUTOMATED,
44
            cls.DOES_NOT_MEET
45
        ]
46
47
        if status not in valid_statuses:
48
            raise InvalidStatus(
49
                    "The given status '{given}' in the control '{control}' "
50
                    "was invalid. Please use one of "
51
                    "the following: {valid}".format(given=status,
52
                                                    control=ctrl,
53
                                                    valid=valid_statuses))
54
        return status
55
56 2
    def __str__(self):
57
        return self.status
58
59 2
    def __eq__(self, other):
60
        if isinstance(other, Status):
61
            return self.status == other.status
62
        elif isinstance(other, str):
63
            return self.status == other
64
        return False
65
66
67 2
class Control(ssg.build_yaml.SelectionHandler):
68 2
    def __init__(self):
69
        super(Control, self).__init__()
70
        self.id = None
71
        self.levels = []
72
        self.notes = ""
73
        self.title = ""
74
        self.description = ""
75
        self.automated = ""
76
        self.status = None
77
        self.mitigation = ""
78
        self.artifact_description = ""
79
        self.status_justification = ""
80
81 2
    def __hash__(self):
82
        """ Controls are meant to be unique, so using the
83
        ID should suffice"""
84
        return hash(self.id)
85
86 2
    @classmethod
87 2
    def from_control_dict(cls, control_dict, env_yaml=None, default_level=["default"]):
88
        control = cls()
89
        control.id = ssg.utils.required_key(control_dict, "id")
90
        control.title = control_dict.get("title")
91
        control.description = control_dict.get("description")
92
        control.status = Status.from_control_info(control.id, control_dict.get("status", None))
93
        control.automated = control_dict.get("automated", "no")
94
        control.status_justification = control_dict.get('status_justification')
95
        control.artifact_description = control_dict.get('artifact_description')
96
        control.mitigation = control_dict.get('mitigation')
97
        if control.status == "automated":
98
            control.automated = "yes"
99
        if control.automated not in ["yes", "no", "partially"]:
100
            msg = (
101
                "Invalid value '%s' of automated key in control "
102
                "%s '%s'. Can be only 'yes', 'no', 'partially'."
103
                % (control.automated,  control.id, control.title))
104
            raise ValueError(msg)
105
        control.levels = control_dict.get("levels", default_level)
106
        control.notes = control_dict.get("notes", "")
107
        selections = control_dict.get("rules", {})
108
109
        product = None
110
        product_dir = None
111
        benchmark_root = None
112
        if env_yaml:
113
            product = env_yaml.get('product', None)
114
            product_dir = env_yaml.get('product_dir', None)
115
            benchmark_root = env_yaml.get('benchmark_root', None)
116
            content_dir = os.path.join(product_dir, benchmark_root)
117
118
        control.selections = selections
119
120
        control.related_rules = control_dict.get("related_rules", [])
121
        control.note = control_dict.get("note")
122
        return control
123
124
125 2
class Level():
126 2
    def __init__(self):
127
        self.id = None
128
        self.inherits_from = None
129
130 2
    @classmethod
131
    def from_level_dict(cls, level_dict):
132
        level = cls()
133
        level.id = ssg.utils.required_key(level_dict, "id")
134
        level.inherits_from = level_dict.get("inherits_from")
135
        return level
136
137 2
class Policy():
138 2
    def __init__(self, filepath, env_yaml=None):
139
        self.id = None
140
        self.env_yaml = env_yaml
141
        self.filepath = filepath
142
        self.controls_dir = os.path.splitext(filepath)[0]
143
        self.controls = []
144
        self.controls_by_id = dict()
145
        self.levels = []
146
        self.levels_by_id = dict()
147
        self.title = ""
148
        self.source = ""
149
150 2
    def _parse_controls_tree(self, tree):
151
        default_level = ["default"]
152
        if self.levels:
153
            default_level = [self.levels[0].id]
154
155
        for node in tree:
156
            try:
157
                control = Control.from_control_dict(
158
                    node, self.env_yaml, default_level=default_level)
159
            except Exception as exc:
160
                msg = (
161
                    "Unable to parse controls from {filename}: {error}"
162
                    .format(filename=self.filepath, error=str(exc)))
163
                raise RuntimeError(msg)
164
            if "controls" in node:
165
                for sc in self._parse_controls_tree(node["controls"]):
166
                    yield sc
167
                    control.update_with(sc)
168
            yield control
169
170 2
    def load(self):
171
        yaml_contents = ssg.yaml.open_and_expand(self.filepath, self.env_yaml)
172
        self.id = ssg.utils.required_key(yaml_contents, "id")
173
        self.title = ssg.utils.required_key(yaml_contents, "title")
174
        self.source = yaml_contents.get("source", "")
175
176
        level_list = yaml_contents.get("levels", [])
177
        for lv in level_list:
178
            level = Level.from_level_dict(lv)
179
            self.levels.append(level)
180
            self.levels_by_id[level.id] = level
181
182
        if os.path.exists(self.controls_dir) and os.path.isdir(self.controls_dir):
183
            controls_tree = yaml_contents.get("controls", list())
184
            files = os.listdir(self.controls_dir)
185
            for file in files:
186
                if file.endswith('.yml'):
187
                    full_path = os.path.join(self.controls_dir, file)
188
                    yaml_contents = ssg.yaml.open_and_expand(full_path, self.env_yaml)
189
                    for control in yaml_contents['controls']:
190
                        controls_tree.append(control)
191
                elif file.startswith('.'):
192
                    continue
193
                else:
194
                    raise RuntimeError("Found non yaml file in %s" % self.controls_dir)
195
        else:
196
            controls_tree = ssg.utils.required_key(yaml_contents, "controls")
197
        for c in self._parse_controls_tree(controls_tree):
198
            self.controls.append(c)
199
            self.controls_by_id[c.id] = c
200
201 2
    def get_control(self, control_id):
202
        try:
203
            c = self.controls_by_id[control_id]
204
            return c
205
        except KeyError:
206
            msg = "%s not found in policy %s" % (
207
                control_id, self.id
208
            )
209
            raise ValueError(msg)
210
211 2
    def get_level(self, level_id):
212
        try:
213
            lv = self.levels_by_id[level_id]
214
            return lv
215
        except KeyError:
216
            msg = "Level %s not found in policy %s" % (
217
                level_id, self.id
218
            )
219
            raise ValueError(msg)
220
221 2
    def get_level_with_ancestors_sequence(self, level_id):
222
        # use OrderedDict for Python2 compatibility instead of ordered set
223
        levels = collections.OrderedDict()
224
        level = self.get_level(level_id)
225
        levels[level] = ""
226
        if level.inherits_from:
227
            for lv in level.inherits_from:
228
                eligible_levels = [l for l in self.get_level_with_ancestors_sequence(lv) if l not in levels.keys()]
229
                for l in eligible_levels:
230
                    levels[l] = ""
231
        return list(levels.keys())
232
233
234 2
class ControlsManager():
235 2
    def __init__(self, controls_dir, env_yaml=None):
236
        self.controls_dir = os.path.abspath(controls_dir)
237
        self.env_yaml = env_yaml
238
        self.policies = {}
239
240 2
    def load(self):
241
        if not os.path.exists(self.controls_dir):
242
            return
243
        for filename in sorted(glob(os.path.join(self.controls_dir, "*.yml"))):
244
            logging.info("Found file %s" % (filename))
245
            filepath = os.path.join(self.controls_dir, filename)
246
            policy = Policy(filepath, self.env_yaml)
247
            policy.load()
248
            self.policies[policy.id] = policy
249
250 2
    def get_control(self, policy_id, control_id):
251
        try:
252
            policy = self.policies[policy_id]
253
        except KeyError:
254
            msg = "policy '%s' doesn't exist" % (policy_id)
255
            raise ValueError(msg)
256
        control = policy.get_control(control_id)
257
        return control
258
259 2
    def _get_policy(self, policy_id):
260
        try:
261
            policy = self.policies[policy_id]
262
        except KeyError:
263
            msg = "policy '%s' doesn't exist" % (policy_id)
264
            raise ValueError(msg)
265
        return policy
266
267 2
    def get_all_controls_of_level(self, policy_id, level_id):
268
        policy = self._get_policy(policy_id)
269
        levels = policy.get_level_with_ancestors_sequence(level_id)
270
        all_policy_controls = self.get_all_controls(policy_id)
271
        eligible_controls = []
272
        already_defined_variables = set()
273
        # we will go level by level, from top to bottom
274
        # this is done to enable overriding of variables by higher levels
275
        for lv in levels:
276
            for control in all_policy_controls:
277
                if lv.id not in control.levels:
278
                    continue
279
280
                variables = set(control.variables.keys())
281
282
                variables_to_remove = variables.intersection(already_defined_variables)
283
                already_defined_variables.update(variables)
284
285
                new_c = self._get_control_without_variables(variables_to_remove, control)
286
                eligible_controls.append(new_c)
287
288
        return eligible_controls
289
290 2
    @staticmethod
291
    def _get_control_without_variables(variables_to_remove, control):
292
        if not variables_to_remove:
293
            return control
294
295
        new_c = copy.deepcopy(control)
296
        for var in variables_to_remove:
297
            del new_c.variables[var]
298
        return new_c
299
300 2
    def get_all_controls(self, policy_id):
301
        policy = self._get_policy(policy_id)
302
        return policy.controls_by_id.values()
303