Passed
Push — master ( 13b0af...ea42c3 )
by Jan
13:56 queued 11:45
created

ControlsManager.get_all_controls_of_level_at_least()   A

Complexity

Conditions 1

Size

Total Lines 9
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 8
nop 3
dl 0
loc 9
ccs 6
cts 6
cp 1
crap 1
rs 10
c 0
b 0
f 0
1 2
import collections
2 2
import logging
3 2
import os
4 2
from glob import glob
5
6 2
import ssg.yaml
7 2
import ssg.utils
8
9
10 2
class Control():
11 2
    def __init__(self):
12 2
        self.id = None
13 2
        self.rules = []
14 2
        self.variables = {}
15 2
        self.level = ""
16 2
        self.notes = ""
17 2
        self.title = ""
18 2
        self.description = ""
19 2
        self.automated = ""
20
21 2
    @classmethod
22 2
    def from_control_dict(cls, control_dict, default_level=""):
23 2
        control = cls()
24 2
        control.id = ssg.utils.required_key(control_dict, "id")
25 2
        control.title = control_dict.get("title")
26 2
        control.description = control_dict.get("description")
27 2
        control.automated = control_dict.get("automated", "yes")
28 2
        if control.automated not in ["yes", "no", "partially"]:
29
            msg = (
30
                "Invalid value '%s' of automated key in control "
31
                "%s '%s'. Can be only 'yes', 'no', 'partially'."
32
                % (control.automated,  control.id, control.title))
33
            raise ValueError(msg)
34 2
        control.level = control_dict.get("level", default_level)
35 2
        control.notes = control_dict.get("notes", "")
36 2
        selections = control_dict.get("rules", [])
37 2
        for item in selections:
38 2
            if "=" in item:
39 2
                varname, value = item.split("=", 1)
40 2
                control.variables[varname] = value
41
            else:
42 2
                control.rules.append(item)
43 2
        control.related_rules = control_dict.get("related_rules", [])
44 2
        control.note = control_dict.get("note")
45 2
        return control
46
47
48 2
class Policy():
49 2
    def __init__(self, filepath, env_yaml=None):
50 2
        self.id = None
51 2
        self.env_yaml = env_yaml
52 2
        self.filepath = filepath
53 2
        self.controls = []
54 2
        self.controls_by_id = dict()
55 2
        self.levels = []
56 2
        self.level_value = {}
57 2
        self.title = ""
58 2
        self.source = ""
59
60 2
    def _parse_controls_tree(self, tree):
61 2
        default_level = ""
62 2
        if self.levels:
63 2
            default_level = self.levels[0]
64
65 2
        for node in tree:
66 2
            control = Control.from_control_dict(
67
                node, default_level=default_level)
68 2
            if "controls" in node:
69 2
                for sc in self._parse_controls_tree(node["controls"]):
70 2
                    yield sc
71 2
                    control.rules.extend(sc.rules)
72 2
                    control.variables.update(sc.variables)
73 2
                    control.related_rules.extend(sc.related_rules)
74 2
            yield control
75
76 2
    def load(self):
77 2
        yaml_contents = ssg.yaml.open_and_expand(self.filepath, self.env_yaml)
78 2
        self.id = ssg.utils.required_key(yaml_contents, "id")
79 2
        self.title = ssg.utils.required_key(yaml_contents, "title")
80 2
        self.source = yaml_contents.get("source", "")
81
82 2
        self.levels = yaml_contents.get("levels", ["default"])
83 2
        for i, level in enumerate(self.levels):
84 2
            self.level_value[level] = i
85
86 2
        controls_tree = ssg.utils.required_key(yaml_contents, "controls")
87 2
        for c in self._parse_controls_tree(controls_tree):
88 2
            self.controls.append(c)
89 2
            self.controls_by_id[c.id] = c
90
91 2
    def get_control(self, control_id):
92 2
        try:
93 2
            c = self.controls_by_id[control_id]
94 2
            return c
95
        except KeyError:
96
            msg = "%s not found in policy %s" % (
97
                control_id, self.id
98
            )
99
            raise ValueError(msg)
100
101
102 2
class ControlsManager():
103 2
    def __init__(self, controls_dir, env_yaml=None):
104 2
        self.controls_dir = os.path.abspath(controls_dir)
105 2
        self.env_yaml = env_yaml
106 2
        self.policies = {}
107
108 2
    def load(self):
109 2
        if not os.path.exists(self.controls_dir):
110
            return
111 2
        for filename in glob(os.path.join(self.controls_dir, "*.yml")):
112 2
            logging.info("Found file %s" % (filename))
113 2
            filepath = os.path.join(self.controls_dir, filename)
114 2
            policy = Policy(filepath, self.env_yaml)
115 2
            policy.load()
116 2
            self.policies[policy.id] = policy
117
118 2
    def get_control(self, policy_id, control_id):
119 2
        try:
120 2
            policy = self.policies[policy_id]
121
        except KeyError:
122
            msg = "policy '%s' doesn't exist" % (policy_id)
123
            raise ValueError(msg)
124 2
        control = policy.get_control(control_id)
125 2
        return control
126
127 2
    def _get_policy(self, policy_id):
128 2
        try:
129 2
            policy = self.policies[policy_id]
130
        except KeyError:
131
            msg = "policy '%s' doesn't exist" % (policy_id)
132
            raise ValueError(msg)
133 2
        return policy
134
135 2
    def _get_policy_level_value(self, policy, level_id):
136 2
        if not level_id:
137
            return 0
138
139 2
        if level_id not in policy.levels:
140
            msg = (
141
                "Control level {level_id} not compatible "
142
                "with policy {policy_id}"
143
                .format(level_id=level_id, policy_id=policy.id)
144
            )
145
            raise ValueError(msg)
146 2
        return policy.level_value[level_id]
147
148 2
    def get_all_controls_of_level_at_least(self, policy_id, level_id=None):
149 2
        policy = self._get_policy(policy_id)
150 2
        level = self._get_policy_level_value(policy, level_id)
151
152 2
        all_policy_controls = self.get_all_controls(policy_id)
153 2
        eligible_controls = [
154
                c for c in all_policy_controls
155
                if policy.level_value[c.level] <= level]
156 2
        return eligible_controls
157
158 2
    def get_all_controls(self, policy_id):
159 2
        policy = self._get_policy(policy_id)
160
        return policy.controls_by_id.values()
161