Test Failed
Push — master ( 5f96f0...be9a44 )
by Jan
02:37 queued 15s
created

ssg.controls.Control.__hash__()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 4
ccs 1
cts 2
cp 0.5
crap 1.125
rs 10
c 0
b 0
f 0
1 2
import collections
2 2
import logging
3 2
import os
4 2
import copy
5 2
from glob import glob
6
7 2
import ssg.build_yaml
8 2
import ssg.yaml
9 2
import ssg.utils
10
11 2
from ssg.rules import get_rule_path_by_id
12
13
14 2
class InvalidStatus(Exception):
15 2
    pass
16
17 2
class Status():
18 2
    PENDING = "pending"
19 2
    PLANNED = "planned"
20 2
    NOT_APPLICABLE = "not applicable"
21 2
    INHERENTLY_MET = "inherently met"
22 2
    DOCUMENTATION = "documentation"
23 2
    PARTIAL = "partial"
24 2
    SUPPORTED = "supported"
25 2
    AUTOMATED = "automated"
26
27 2
    def __init__(self, status):
28
        self.status = status
29
30 2
    @classmethod
31
    def from_control_info(cls, ctrl, status):
32 2
        if status is None:
33 2
            return cls.PENDING
34
35
        valid_statuses = [
36
            cls.PENDING,
37
            cls.PLANNED,
38
            cls.NOT_APPLICABLE,
39
            cls.INHERENTLY_MET,
40
            cls.DOCUMENTATION,
41
            cls.PARTIAL,
42
            cls.SUPPORTED,
43
            cls.AUTOMATED,
44
        ]
45
46
        if status not in valid_statuses:
47
            raise InvalidStatus(
48
                    "The given status '{given}' in the control '{control}' "
49
                    "was invalid. Please use one of "
50
                    "the following: {valid}".format(given=status,
51
                                                    control=ctrl,
52
                                                    valid=valid_statuses))
53
        return status
54
55 2
    def __str__(self):
56
        return self.status
57
58 2
    def __eq__(self, other):
59
        if isinstance(other, Status):
60
            return self.status == other.status
61
        elif isinstance(other, str):
62
            return self.status == other
63
        return False
64
65
66 2
class Control():
67 2
    def __init__(self):
68 2
        self.id = None
69 2
        self.rules = []
70 2
        self.variables = {}
71 2
        self.levels = []
72 2
        self.notes = ""
73 2
        self.title = ""
74 2
        self.description = ""
75 2
        self.automated = ""
76 2
        self.status = None
77
78 2
    def __hash__(self):
79
        """ Controls are meant to be unique, so using the
80
        ID should suffice"""
81
        return hash(self.id)
82
83 2
    @classmethod
84 2
    def from_control_dict(cls, control_dict, env_yaml=None, default_level=["default"]):
85 2
        control = cls()
86 2
        control.id = ssg.utils.required_key(control_dict, "id")
87 2
        control.title = control_dict.get("title")
88 2
        control.description = control_dict.get("description")
89 2
        control.status = Status.from_control_info(control.id, control_dict.get("status", None))
90 2
        control.automated = control_dict.get("automated", "no")
91 2
        if control.status == "automated":
92
            control.automated = "yes"
93 2
        if control.automated not in ["yes", "no", "partially"]:
94
            msg = (
95
                "Invalid value '%s' of automated key in control "
96
                "%s '%s'. Can be only 'yes', 'no', 'partially'."
97
                % (control.automated,  control.id, control.title))
98
            raise ValueError(msg)
99 2
        control.levels = control_dict.get("levels", default_level)
100 2
        control.notes = control_dict.get("notes", "")
101 2
        selections = control_dict.get("rules", [])
102
103 2
        product = None
104 2
        product_dir = None
105 2
        benchmark_root = None
106 2
        if env_yaml:
107
            product = env_yaml.get('product', None)
108
            product_dir = env_yaml.get('product_dir', None)
109
            benchmark_root = env_yaml.get('benchmark_root', None)
110
            content_dir = os.path.join(product_dir, benchmark_root)
111
112 2
        for item in selections:
113 2
            if "=" in item:
114 2
                varname, value = item.split("=", 1)
115 2
                control.variables[varname] = value
116
            else:
117
                # Check if rule is applicable to product, i.e.: prodtype has product id
118 2
                if product is None:
119
                    # The product was not specified, simply add the rule
120 2
                    control.rules.append(item)
121
                else:
122
                    rule_yaml = get_rule_path_by_id(content_dir, item)
0 ignored issues
show
introduced by
The variable content_dir does not seem to be defined in case env_yaml on line 106 is False. Are you sure this can never be the case?
Loading history...
123
                    if rule_yaml is None:
124
                        # item not found in benchmark_root
125
                        continue
126
                    rule = ssg.build_yaml.Rule.from_yaml(rule_yaml, env_yaml)
127
                    if rule.prodtype == "all" or product in rule.prodtype:
128
                        control.rules.append(item)
129
                    else:
130
                        logging.info("Rule {item} doesn't apply to {product}".format(
131
                            item=item,
132
                            product=product))
133
134 2
        control.related_rules = control_dict.get("related_rules", [])
135 2
        control.note = control_dict.get("note")
136 2
        return control
137
138
139 2
class Level():
140 2
    def __init__(self):
141 2
        self.id = None
142 2
        self.inherits_from = None
143
144 2
    @classmethod
145
    def from_level_dict(cls, level_dict):
146 2
        level = cls()
147 2
        level.id = ssg.utils.required_key(level_dict, "id")
148 2
        level.inherits_from = level_dict.get("inherits_from")
149 2
        return level
150
151 2
class Policy():
152 2
    def __init__(self, filepath, env_yaml=None):
153 2
        self.id = None
154 2
        self.env_yaml = env_yaml
155 2
        self.filepath = filepath
156 2
        self.controls = []
157 2
        self.controls_by_id = dict()
158 2
        self.levels = []
159 2
        self.levels_by_id = dict()
160 2
        self.title = ""
161 2
        self.source = ""
162
163 2
    def _parse_controls_tree(self, tree):
164 2
        default_level = ["default"]
165 2
        if self.levels:
166 2
            default_level = [self.levels[0].id]
167
168 2
        for node in tree:
169 2
            try:
170 2
                control = Control.from_control_dict(
171
                    node, self.env_yaml, default_level=default_level)
172
            except Exception as exc:
173
                msg = (
174
                    "Unable to parse controls from {filename}: {error}"
175
                    .format(filename=self.filepath, error=str(exc)))
176
                raise RuntimeError(msg)
177 2
            if "controls" in node:
178 2
                for sc in self._parse_controls_tree(node["controls"]):
179 2
                    yield sc
180 2
                    control.rules.extend(sc.rules)
181 2
                    control.variables.update(sc.variables)
182 2
                    control.related_rules.extend(sc.related_rules)
183 2
            yield control
184
185 2
    def load(self):
186 2
        yaml_contents = ssg.yaml.open_and_expand(self.filepath, self.env_yaml)
187 2
        self.id = ssg.utils.required_key(yaml_contents, "id")
188 2
        self.title = ssg.utils.required_key(yaml_contents, "title")
189 2
        self.source = yaml_contents.get("source", "")
190
191 2
        level_list = yaml_contents.get("levels", [])
192 2
        for lv in level_list:
193 2
            level = Level.from_level_dict(lv)
194 2
            self.levels.append(level)
195 2
            self.levels_by_id[level.id] = level
196
197 2
        controls_tree = ssg.utils.required_key(yaml_contents, "controls")
198 2
        for c in self._parse_controls_tree(controls_tree):
199 2
            self.controls.append(c)
200 2
            self.controls_by_id[c.id] = c
201
202 2
    def get_control(self, control_id):
203 2
        try:
204 2
            c = self.controls_by_id[control_id]
205 2
            return c
206
        except KeyError:
207
            msg = "%s not found in policy %s" % (
208
                control_id, self.id
209
            )
210
            raise ValueError(msg)
211
212 2
    def get_level(self, level_id):
213 2
        try:
214 2
            lv = self.levels_by_id[level_id]
215 2
            return lv
216
        except KeyError:
217
            msg = "Level %s not found in policy %s" % (
218
                level_id, self.id
219
            )
220
            raise ValueError(msg)
221
222 2
    def get_level_with_ancestors_sequence(self, level_id):
223
        # use OrderedDict for Python2 compatibility instead of ordered set
224 2
        levels = collections.OrderedDict()
225 2
        level = self.get_level(level_id)
226 2
        levels[level] = ""
227 2
        if level.inherits_from:
228 2
            for lv in level.inherits_from:
229 2
                eligible_levels = [l for l in self.get_level_with_ancestors_sequence(lv) if l not in levels.keys()]
230 2
                for l in eligible_levels:
231 2
                    levels[l] = ""
232 2
        return list(levels.keys())
233
234
235 2
class ControlsManager():
236 2
    def __init__(self, controls_dir, env_yaml=None):
237 2
        self.controls_dir = os.path.abspath(controls_dir)
238 2
        self.env_yaml = env_yaml
239 2
        self.policies = {}
240
241 2
    def load(self):
242 2
        if not os.path.exists(self.controls_dir):
243
            return
244 2
        for filename in sorted(glob(os.path.join(self.controls_dir, "*.yml"))):
245 2
            logging.info("Found file %s" % (filename))
246 2
            filepath = os.path.join(self.controls_dir, filename)
247 2
            policy = Policy(filepath, self.env_yaml)
248 2
            policy.load()
249 2
            self.policies[policy.id] = policy
250
251 2
    def get_control(self, policy_id, control_id):
252 2
        try:
253 2
            policy = self.policies[policy_id]
254
        except KeyError:
255
            msg = "policy '%s' doesn't exist" % (policy_id)
256
            raise ValueError(msg)
257 2
        control = policy.get_control(control_id)
258 2
        return control
259
260 2
    def _get_policy(self, policy_id):
261 2
        try:
262 2
            policy = self.policies[policy_id]
263
        except KeyError:
264
            msg = "policy '%s' doesn't exist" % (policy_id)
265
            raise ValueError(msg)
266 2
        return policy
267
268 2
    def get_all_controls_of_level(self, policy_id, level_id):
269 2
        policy = self._get_policy(policy_id)
270 2
        levels = policy.get_level_with_ancestors_sequence(level_id)
271 2
        all_policy_controls = self.get_all_controls(policy_id)
272 2
        eligible_controls = []
273 2
        already_defined_variables = set()
274
        # we will go level by level, from top to bottom
275
        # this is done to enable overriding of variables by higher levels
276 2
        for lv in levels:
277 2
            for control in all_policy_controls:
278 2
                if lv.id not in control.levels:
279 2
                    continue
280
281 2
                variables = set(control.variables.keys())
282
283 2
                variables_to_remove = variables.intersection(already_defined_variables)
284 2
                already_defined_variables.update(variables)
285
286 2
                new_c = self._get_control_without_variables(variables_to_remove, control)
287 2
                eligible_controls.append(new_c)
288
289 2
        return eligible_controls
290
291 2
    @staticmethod
292
    def _get_control_without_variables(variables_to_remove, control):
293 2
        if not variables_to_remove:
294 2
            return control
295
296 2
        new_c = copy.deepcopy(control)
297 2
        for var in variables_to_remove:
298 2
            del new_c.variables[var]
299 2
        return new_c
300
301 2
    def get_all_controls(self, policy_id):
302 2
        policy = self._get_policy(policy_id)
303
        return policy.controls_by_id.values()
304