Test Failed
Push — master ( 6424ac...10b554 )
by Jan
03:55 queued 11s
created

ssg.controls.Policy.get_level_with_ancestors()   A

Complexity

Conditions 3

Size

Total Lines 8
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 8
nop 2
dl 0
loc 8
ccs 8
cts 8
cp 1
crap 3
rs 10
c 0
b 0
f 0
1 2
import collections
2 2
import logging
3 2
import os
4 2
from glob import glob
5
6 2
import ssg.build_yaml
7 2
import ssg.yaml
8 2
import ssg.utils
9
10 2
from ssg.rules import get_rule_path_by_id
11
12
13 2
class Control():
14 2
    def __init__(self):
15 2
        self.id = None
16 2
        self.rules = []
17 2
        self.variables = {}
18 2
        self.levels = []
19 2
        self.notes = ""
20 2
        self.title = ""
21 2
        self.description = ""
22 2
        self.automated = ""
23
24 2
    @classmethod
25 2
    def from_control_dict(cls, control_dict, env_yaml=None, default_level=["default"]):
26 2
        control = cls()
27 2
        control.id = ssg.utils.required_key(control_dict, "id")
28 2
        control.title = control_dict.get("title")
29 2
        control.description = control_dict.get("description")
30 2
        control.automated = control_dict.get("automated", "yes")
31 2
        if control.automated not in ["yes", "no", "partially"]:
32
            msg = (
33
                "Invalid value '%s' of automated key in control "
34
                "%s '%s'. Can be only 'yes', 'no', 'partially'."
35
                % (control.automated,  control.id, control.title))
36
            raise ValueError(msg)
37 2
        control.levels = control_dict.get("levels", default_level)
38 2
        control.notes = control_dict.get("notes", "")
39 2
        selections = control_dict.get("rules", [])
40
41 2
        product = None
42 2
        product_dir = None
43 2
        benchmark_root = None
44 2
        if env_yaml:
45
            product = env_yaml.get('product', None)
46
            product_dir = env_yaml.get('product_dir', None)
47
            benchmark_root = env_yaml.get('benchmark_root', None)
48
            content_dir = os.path.join(product_dir, benchmark_root)
49
50 2
        for item in selections:
51 2
            if "=" in item:
52 2
                varname, value = item.split("=", 1)
53 2
                control.variables[varname] = value
54
            else:
55
                # Check if rule is applicable to product, i.e.: prodtype has product id
56 2
                if product is None:
57
                    # The product was not specified, simply add the rule
58 2
                    control.rules.append(item)
59
                else:
60
                    rule_yaml = get_rule_path_by_id(content_dir, item)
0 ignored issues
show
introduced by
The variable content_dir does not seem to be defined in case env_yaml on line 44 is False. Are you sure this can never be the case?
Loading history...
61
                    if rule_yaml is None:
62
                        # item not found in benchmark_root
63
                        continue
64
                    rule = ssg.build_yaml.Rule.from_yaml(rule_yaml, env_yaml)
65
                    if rule.prodtype == "all" or product in rule.prodtype:
66
                        control.rules.append(item)
67
                    else:
68
                        logging.info("Rule {item} doesn't apply to {product}".format(
69
                            item=item,
70
                            product=product))
71
72 2
        control.related_rules = control_dict.get("related_rules", [])
73 2
        control.note = control_dict.get("note")
74 2
        return control
75
76
77 2
class Level():
78 2
    def __init__(self):
79 2
        self.id = None
80 2
        self.inherits_from = None
81
82 2
    @classmethod
83
    def from_level_dict(cls, level_dict):
84 2
        level = cls()
85 2
        level.id = ssg.utils.required_key(level_dict, "id")
86 2
        level.inherits_from = level_dict.get("inherits_from")
87 2
        return level
88
89 2
class Policy():
90 2
    def __init__(self, filepath, env_yaml=None):
91 2
        self.id = None
92 2
        self.env_yaml = env_yaml
93 2
        self.filepath = filepath
94 2
        self.controls = []
95 2
        self.controls_by_id = dict()
96 2
        self.levels = []
97 2
        self.levels_by_id = dict()
98 2
        self.title = ""
99 2
        self.source = ""
100
101 2
    def _parse_controls_tree(self, tree):
102 2
        default_level = ["default"]
103 2
        if self.levels:
104 2
            default_level = [self.levels[0].id]
105
106 2
        for node in tree:
107 2
            control = Control.from_control_dict(
108
                node, self.env_yaml, default_level=default_level)
109 2
            if "controls" in node:
110 2
                for sc in self._parse_controls_tree(node["controls"]):
111 2
                    yield sc
112 2
                    control.rules.extend(sc.rules)
113 2
                    control.variables.update(sc.variables)
114 2
                    control.related_rules.extend(sc.related_rules)
115 2
            yield control
116
117 2
    def load(self):
118 2
        yaml_contents = ssg.yaml.open_and_expand(self.filepath, self.env_yaml)
119 2
        self.id = ssg.utils.required_key(yaml_contents, "id")
120 2
        self.title = ssg.utils.required_key(yaml_contents, "title")
121 2
        self.source = yaml_contents.get("source", "")
122
123 2
        level_list = yaml_contents.get("levels", [])
124 2
        for lv in level_list:
125 2
            level = Level.from_level_dict(lv)
126 2
            self.levels.append(level)
127 2
            self.levels_by_id[level.id] = level
128
129 2
        controls_tree = ssg.utils.required_key(yaml_contents, "controls")
130 2
        for c in self._parse_controls_tree(controls_tree):
131 2
            self.controls.append(c)
132 2
            self.controls_by_id[c.id] = c
133
134 2
    def get_control(self, control_id):
135 2
        try:
136 2
            c = self.controls_by_id[control_id]
137 2
            return c
138
        except KeyError:
139
            msg = "%s not found in policy %s" % (
140
                control_id, self.id
141
            )
142
            raise ValueError(msg)
143
144 2
    def get_level(self, level_id):
145 2
        try:
146 2
            lv = self.levels_by_id[level_id]
147 2
            return lv
148
        except KeyError:
149
            msg = "Level %s not found in policy %s" % (
150
                level_id, self.id
151
            )
152
            raise ValueError(msg)
153
154 2
    def get_level_with_ancestors(self, level_id):
155 2
        levels = set()
156 2
        level = self.get_level(level_id)
157 2
        levels.add(level)
158 2
        if level.inherits_from:
159 2
            for lv in level.inherits_from:
160 2
                levels.update(self.get_level_with_ancestors(lv))
161 2
        return levels
162
163
164
165 2
class ControlsManager():
166 2
    def __init__(self, controls_dir, env_yaml=None):
167 2
        self.controls_dir = os.path.abspath(controls_dir)
168 2
        self.env_yaml = env_yaml
169 2
        self.policies = {}
170
171 2
    def load(self):
172 2
        if not os.path.exists(self.controls_dir):
173
            return
174 2
        for filename in sorted(glob(os.path.join(self.controls_dir, "*.yml"))):
175 2
            logging.info("Found file %s" % (filename))
176 2
            filepath = os.path.join(self.controls_dir, filename)
177 2
            policy = Policy(filepath, self.env_yaml)
178 2
            policy.load()
179 2
            self.policies[policy.id] = policy
180
181 2
    def get_control(self, policy_id, control_id):
182 2
        try:
183 2
            policy = self.policies[policy_id]
184
        except KeyError:
185
            msg = "policy '%s' doesn't exist" % (policy_id)
186
            raise ValueError(msg)
187 2
        control = policy.get_control(control_id)
188 2
        return control
189
190 2
    def _get_policy(self, policy_id):
191 2
        try:
192 2
            policy = self.policies[policy_id]
193
        except KeyError:
194
            msg = "policy '%s' doesn't exist" % (policy_id)
195
            raise ValueError(msg)
196 2
        return policy
197
198 2
    def get_all_controls_of_level(self, policy_id, level_id):
199 2
        policy = self._get_policy(policy_id)
200 2
        levels = policy.get_level_with_ancestors(level_id)
201 2
        level_ids = set([lv.id for lv in levels])
202
203 2
        all_policy_controls = self.get_all_controls(policy_id)
204 2
        eligible_controls = []
205 2
        for c in all_policy_controls:
206 2
            if len(level_ids.intersection(c.levels)) > 0:
207 2
                eligible_controls.append(c)
208 2
        return eligible_controls
209
210 2
    def get_all_controls(self, policy_id):
211 2
        policy = self._get_policy(policy_id)
212
        return policy.controls_by_id.values()
213