Test Failed
Push — master ( 6149c6...e6e408 )
by Matthew
02:55 queued 14s
created

ssg.controls.Policy.check_all_rules_exist()   A

Complexity

Conditions 3

Size

Total Lines 7
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 0
Metric Value
cc 3
eloc 7
nop 2
dl 0
loc 7
ccs 0
cts 6
cp 0
crap 12
rs 10
c 0
b 0
f 0
1
import collections
2
import os
3
import copy
4
from glob import glob
5
6
import ssg.entities.common
7
import ssg.yaml
8
import ssg.utils
9
10
11
class InvalidStatus(Exception):
12
    pass
13
14
15
class Status:
16
    SUPPORTED = "supported"
17
    PLANNED = "planned"
18
    PENDING = "pending"
19
    PARTIAL = "partial"
20
    NOT_APPLICABLE = "not applicable"
21
    MANUAL = "manual"
22
    INHERENTLY_MET = "inherently met"
23
    DOES_NOT_MEET = "does not meet"
24
    DOCUMENTATION = "documentation"
25
    AUTOMATED = "automated"
26
27
    def __init__(self, status):
28
        self.status = status
29
30
    @classmethod
31
    def get_status_list(cls):
32
        valid_statuses = [
33
            cls.AUTOMATED,
34
            cls.DOCUMENTATION,
35
            cls.DOES_NOT_MEET,
36
            cls.INHERENTLY_MET,
37
            cls.MANUAL,
38
            cls.NOT_APPLICABLE,
39
            cls.PARTIAL,
40
            cls.PENDING,
41
            cls.PLANNED,
42
            cls.SUPPORTED,
43
        ]
44
        return valid_statuses
45
46
    @classmethod
47
    def from_control_info(cls, ctrl, status):
48
        if status is None:
49
            return cls.PENDING
50
51
        valid_statuses = cls.get_status_list()
52
53
        if status not in valid_statuses:
54
            raise InvalidStatus(
55
                    "The given status '{given}' in the control '{control}' "
56
                    "was invalid. Please use one of "
57
                    "the following: {valid}".format(given=status,
58
                                                    control=ctrl,
59
                                                    valid=valid_statuses))
60
        return status
61
62
    def __str__(self):
63
        return self.status
64
65
    def __eq__(self, other):
66
        if isinstance(other, Status):
67
            return self.status == other.status
68
        elif isinstance(other, str):
69
            return self.status == other
70
        return False
71
72
73
class Control(ssg.entities.common.SelectionHandler, ssg.entities.common.XCCDFEntity):
74
    KEYS = dict(
75
        id=str,
76
        levels=list,
77
        notes=str,
78
        title=str,
79
        description=str,
80
        rationale=str,
81
        automated=str,
82
        status=None,
83
        mitigation=str,
84
        artifact_description=str,
85
        status_justification=str,
86
        fixtext=str,
87
        check=str,
88
        tickets=list,
89
        original_title=str,
90
        related_rules=list,
91
        rules=list,
92
        controls=list,
93
    )
94
95
    MANDATORY_KEYS = {
96
        "title",
97
    }
98
99
    def __init__(self):
100
        super(Control, self).__init__()
101
        self.id = None
102
        self.levels = []
103
        self.notes = ""
104
        self.title = ""
105
        self.description = ""
106
        self.rationale = ""
107
        self.automated = ""
108
        self.status = None
109
        self.mitigation = ""
110
        self.artifact_description = ""
111
        self.status_justification = ""
112
        self.fixtext = ""
113
        self.check = ""
114
        self.controls = []
115
        self.tickets = []
116
        self.original_title = ""
117
        self.related_rules = []
118
        self.rules = []
119
120
    def __hash__(self):
121
        """ Controls are meant to be unique, so using the
122
        ID should suffice"""
123
        return hash(self.id)
124
125
    @classmethod
126
    def _check_keys(cls, control_dict):
127
        for key in control_dict.keys():
128
            # Rules shouldn't be in KEYS that data is in selections
129
            if key not in cls.KEYS.keys() and key not in ['rules', ]:
130
                raise ValueError("Key %s is not allowed in a control file." % key)
131
132
    @classmethod
133
    def from_control_dict(cls, control_dict, env_yaml=None, default_level=["default"]):
134
        cls._check_keys(control_dict)
135
        control = cls()
136
        control.id = ssg.utils.required_key(control_dict, "id")
137
        control.title = control_dict.get("title")
138
        control.description = control_dict.get("description")
139
        control.rationale = control_dict.get("rationale")
140
        control.status = Status.from_control_info(control.id, control_dict.get("status", None))
141
        control.automated = control_dict.get("automated", "no")
142
        control.status_justification = control_dict.get('status_justification')
143
        control.artifact_description = control_dict.get('artifact_description')
144
        control.mitigation = control_dict.get('mitigation')
145
        control.fixtext = control_dict.get('fixtext')
146
        control.check = control_dict.get('check')
147
        control.tickets = control_dict.get('tickets')
148
        control.original_title = control_dict.get('original_title')
149
        control.related_rules = control_dict.get('related_rules')
150
        control.rules = control_dict.get('rules')
151
152
        if control.status == "automated":
153
            control.automated = "yes"
154
        if control.automated not in ["yes", "no", "partially"]:
155
            msg = (
156
                "Invalid value '%s' of automated key in control "
157
                "%s '%s'. Can be only 'yes', 'no', 'partially'."
158
                % (control.automated,  control.id, control.title))
159
            raise ValueError(msg)
160
        control.levels = control_dict.get("levels", default_level)
161
        control.notes = control_dict.get("notes", "")
162
        selections = control_dict.get("rules", {})
163
164
        control.selections = selections
165
166
        control.related_rules = control_dict.get("related_rules", [])
167
        control.rules = control_dict.get("rules", [])
168
        return control
169
170
    def represent_as_dict(self):
171
        data = super(Control, self).represent_as_dict()
172
        data["rules"] = self.selections
173
        data["controls"] = self.controls
174
        return data
175
176
177
class Level(ssg.entities.common.XCCDFEntity):
178
    KEYS = dict(
179
        id=lambda: str,
180
        inherits_from=lambda: None,
181
    )
182
183
    def __init__(self):
184
        self.id = None
185
        self.inherits_from = None
186
187
    @classmethod
188
    def from_level_dict(cls, level_dict):
189
        level = cls()
190
        level.id = ssg.utils.required_key(level_dict, "id")
191
        level.inherits_from = level_dict.get("inherits_from")
192
        return level
193
194
195
class Policy(ssg.entities.common.XCCDFEntity):
196
    def __init__(self, filepath, env_yaml=None):
197
        self.id = None
198
        self.env_yaml = env_yaml
199
        self.filepath = filepath
200
        self.controls_dir = os.path.splitext(filepath)[0]
201
        self.controls = []
202
        self.controls_by_id = dict()
203
        self.levels = []
204
        self.levels_by_id = dict()
205
        self.title = ""
206
        self.source = ""
207
208
    def represent_as_dict(self):
209
        data = dict()
210
        data["id"] = self.id
211
        data["title"] = self.title
212
        data["source"] = self.source
213
        data["definition_location"] = self.filepath
214
        data["controls"] = [c.represent_as_dict() for c in self.controls]
215
        data["levels"] = [l.represent_as_dict() for l in self.levels]
216
        return data
217
218
    @property
219
    def default_level(self):
220
        result = ["default"]
221
        if self.levels:
222
            result = [self.levels[0].id]
223
        return result
224
225
    def check_all_rules_exist(self, existing_rules):
226
        for c in self.controls:
227
            nonexisting_rules = set(c.selected) - existing_rules
228
            if nonexisting_rules:
229
                msg = "Control %s:%s contains nonexisting rule(s) %s" % (
230
                    self.id, c.id, ", ".join(nonexisting_rules))
231
                raise ValueError(msg)
232
233
    def remove_selections_not_known(self, known_rules):
234
        for c in self.controls:
235
            selections = set(c.selected).intersection(known_rules)
236
            c.selected = list(selections)
237
            unselections = set(c.unselected).intersection(known_rules)
238
            c.unselected = list(unselections)
239
240
    def _create_control_from_subtree(self, subtree):
241
        try:
242
            control = Control.from_control_dict(
243
                subtree, self.env_yaml, default_level=self.default_level)
244
        except Exception as exc:
245
            msg = (
246
                "Unable to parse controls from {filename}: {error}"
247
                .format(filename=self.filepath, error=str(exc)))
248
            raise RuntimeError(msg)
249
        return control
250
251
    def _extract_and_record_subcontrols(self, current_control, controls_tree):
252
        subcontrols = []
253
        if "controls" not in controls_tree:
254
            return subcontrols
255
256
        for control_def_or_ref in controls_tree["controls"]:
257
            if isinstance(control_def_or_ref, str):
258
                control_ref = control_def_or_ref
259
                current_control.controls.append(control_ref)
260
                continue
261
            control_def = control_def_or_ref
262
            for sc in self._parse_controls_tree([control_def]):
263
                current_control.controls.append(sc.id)
264
                current_control.update_with(sc)
265
                subcontrols.append(sc)
266
        return subcontrols
267
268
    def _parse_controls_tree(self, tree):
269
        controls = []
270
271
        for control_subtree in tree:
272
            control = self._create_control_from_subtree(control_subtree)
273
            subcontrols = self._extract_and_record_subcontrols(control, control_subtree)
274
            controls.extend(subcontrols)
275
            controls.append(control)
276
        return controls
277
278
    def save_controls_tree(self, tree):
279
        for c in self._parse_controls_tree(tree):
280
            self.controls.append(c)
281
            self.controls_by_id[c.id] = c
282
283
    def _parse_file_into_control_trees(self, dirname, basename):
284
        controls_trees = []
285
        if basename.endswith('.yml'):
286
            full_path = os.path.join(dirname, basename)
287
            yaml_contents = ssg.yaml.open_and_expand(full_path, self.env_yaml)
288
            for control in yaml_contents['controls']:
289
                controls_trees.append(control)
290
        elif basename.startswith('.'):
291
            pass
292
        else:
293
            raise RuntimeError("Found non yaml file in %s" % self.controls_dir)
294
        return controls_trees
295
296
    def _load_from_subdirectory(self, yaml_contents):
297
        controls_tree = yaml_contents.get("controls", list())
298
        files = os.listdir(self.controls_dir)
299
        for file in files:
300
            trees = self._parse_file_into_control_trees(self.controls_dir, file)
301
            controls_tree.extend(trees)
302
        return controls_tree
303
304
    def load(self):
305
        yaml_contents = ssg.yaml.open_and_expand(self.filepath, self.env_yaml)
306
        controls_dir = yaml_contents.get("controls_dir")
307
        if controls_dir:
308
            self.controls_dir = os.path.join(os.path.dirname(self.filepath), controls_dir)
309
        self.id = ssg.utils.required_key(yaml_contents, "id")
310
        self.title = ssg.utils.required_key(yaml_contents, "title")
311
        self.source = yaml_contents.get("source", "")
312
313
        level_list = yaml_contents.get("levels", [])
314
        for lv in level_list:
315
            level = Level.from_level_dict(lv)
316
            self.levels.append(level)
317
            self.levels_by_id[level.id] = level
318
319
        if os.path.exists(self.controls_dir) and os.path.isdir(self.controls_dir):
320
            controls_tree = self._load_from_subdirectory(yaml_contents)
321
        else:
322
            controls_tree = ssg.utils.required_key(yaml_contents, "controls")
323
        self.save_controls_tree(controls_tree)
324
325
    def get_control(self, control_id):
326
        try:
327
            c = self.controls_by_id[control_id]
328
            return c
329
        except KeyError:
330
            msg = "%s not found in policy %s" % (
331
                control_id, self.id
332
            )
333
            raise ValueError(msg)
334
335
    def get_level(self, level_id):
336
        try:
337
            lv = self.levels_by_id[level_id]
338
            return lv
339
        except KeyError:
340
            msg = "Level %s not found in policy %s" % (
341
                level_id, self.id
342
            )
343
            raise ValueError(msg)
344
345
    def get_level_with_ancestors_sequence(self, level_id):
346
        # use OrderedDict for Python2 compatibility instead of ordered set
347
        levels = collections.OrderedDict()
348
        level = self.get_level(level_id)
349
        levels[level] = ""
350
        if level.inherits_from:
351
            for lv in level.inherits_from:
352
                eligible_levels = [l for l in self.get_level_with_ancestors_sequence(lv) if l not in levels.keys()]
353
                for l in eligible_levels:
354
                    levels[l] = ""
355
        return list(levels.keys())
356
357
358
class ControlsManager():
359
    def __init__(self, controls_dir, env_yaml=None, existing_rules=None):
360
        self.controls_dir = os.path.abspath(controls_dir)
361
        self.env_yaml = env_yaml
362
        self.existing_rules = existing_rules
363
        self.policies = {}
364
365
    def load(self):
366
        if not os.path.exists(self.controls_dir):
367
            return
368
        for filename in sorted(glob(os.path.join(self.controls_dir, "*.yml"))):
369
            filepath = os.path.join(self.controls_dir, filename)
370
            policy = Policy(filepath, self.env_yaml)
371
            policy.load()
372
            self.policies[policy.id] = policy
373
        self.check_all_rules_exist()
374
        self.resolve_controls()
375
376
    def check_all_rules_exist(self):
377
        if self.existing_rules is None:
378
            return
379
        for p in self.policies.values():
380
            p.check_all_rules_exist(self.existing_rules)
381
382
    def remove_selections_not_known(self, known_rules):
383
        known_rules = set(known_rules)
384
        for p in self.policies.values():
385
            p.remove_selections_not_known(known_rules)
386
387
    def resolve_controls(self):
388
        for pid, policy in self.policies.items():
389
            for control in policy.controls:
390
                self._resolve_control(pid, control)
391
392
    def _resolve_control(self, pid, control):
393
        for sub_name in control.controls:
394
            policy_id = pid
395
            if ":" in sub_name:
396
                policy_id, sub_name = sub_name.split(":", 1)
397
            subcontrol = self.get_control(policy_id, sub_name)
398
            self._resolve_control(pid, subcontrol)
399
            control.update_with(subcontrol)
400
401
    def get_control(self, policy_id, control_id):
402
        policy = self._get_policy(policy_id)
403
        control = policy.get_control(control_id)
404
        return control
405
406
    def _get_policy(self, policy_id):
407
        try:
408
            policy = self.policies[policy_id]
409
        except KeyError:
410
            msg = "policy '%s' doesn't exist" % (policy_id)
411
            raise ValueError(msg)
412
        return policy
413
414
    def get_all_controls_of_level(self, policy_id, level_id):
415
        policy = self._get_policy(policy_id)
416
        levels = policy.get_level_with_ancestors_sequence(level_id)
417
        all_policy_controls = self.get_all_controls(policy_id)
418
        eligible_controls = []
419
        already_defined_variables = set()
420
        # we will go level by level, from top to bottom
421
        # this is done to enable overriding of variables by higher levels
422
        for lv in levels:
423
            for control in all_policy_controls:
424
                if lv.id not in control.levels:
425
                    continue
426
427
                variables = set(control.variables.keys())
428
429
                variables_to_remove = variables.intersection(already_defined_variables)
430
                already_defined_variables.update(variables)
431
432
                new_c = self._get_control_without_variables(variables_to_remove, control)
433
                eligible_controls.append(new_c)
434
435
        return eligible_controls
436
437
    @staticmethod
438
    def _get_control_without_variables(variables_to_remove, control):
439
        if not variables_to_remove:
440
            return control
441
442
        new_c = copy.deepcopy(control)
443
        for var in variables_to_remove:
444
            del new_c.variables[var]
445
        return new_c
446
447
    def get_all_controls(self, policy_id):
448
        policy = self._get_policy(policy_id)
449
        return policy.controls_by_id.values()
450
451
    def save_everything(self, output_dir):
452
        ssg.utils.mkdir_p(output_dir)
453
        for policy_id, policy in self.policies.items():
454
            filename = os.path.join(output_dir, "{}.{}".format(policy_id, "yml"))
455
            policy.dump_yaml(filename)
456