Test Failed
Push — master ( 6149c6...e6e408 )
by Matthew
02:55 queued 14s
created

ResolvableProfile.resolve_selections_with_rules()   A

Complexity

Conditions 4

Size

Total Lines 10
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
cc 4
eloc 10
nop 2
dl 0
loc 10
ccs 0
cts 10
cp 0
crap 20
rs 9.9
c 0
b 0
f 0
1
from collections import defaultdict
2
3
from .profile_base import Profile
4
5
6
class ResolvableProfile(Profile):
7
    def __init__(self, * args, ** kwargs):
8
        super(ResolvableProfile, self).__init__(* args, ** kwargs)
9
        self.resolved = False
10
11
    def _controls_ids_to_controls(self, controls_manager, policy_id, control_id_list):
12
        items = [controls_manager.get_control(policy_id, cid) for cid in control_id_list]
13
        return items
14
15
    def resolve_controls(self, controls_manager):
16
        pass
17
18
    def extend_by(self, extended_profile):
19
        self.update_with(extended_profile)
20
21
    def apply_filter(self, rules_by_id):
22
        selections = set()
23
        for rid in self.selected:
24
            rule = rules_by_id[rid]
25
            if not self.rule_filter(rule):
26
                continue
27
            selections.add(rid)
28
        self.selected = list(selections)
29
30
    def resolve(self, all_profiles, rules_by_id, controls_manager=None):
31
        if self.resolved:
32
            return
33
34
        if controls_manager:
35
            self.resolve_controls(controls_manager)
36
37
        if self.extends:
38
            if self.extends not in all_profiles:
39
                msg = (
40
                    "Profile {name} extends profile {extended}, but "
41
                    "only profiles {known_profiles} are available for resolution."
42
                    .format(name=self.id_, extended=self.extends,
43
                            known_profiles=list(all_profiles.keys())))
44
                raise RuntimeError(msg)
45
            extended_profile = all_profiles[self.extends]
46
            extended_profile.resolve(all_profiles, rules_by_id, controls_manager)
47
48
            self.extend_by(extended_profile)
49
50
        self.selected = [s for s in set(self.selected) if s not in self.unselected]
51
52
        self.unselected = []
53
        self.extends = None
54
55
        self.selected = sorted(self.selected)
56
57
        for rid in self.selected:
58
            if rid not in rules_by_id:
59
                msg = (
60
                    "Rule {rid} is selected by {profile}, but the rule is not available. "
61
                    "This may be caused by a discrepancy of prodtypes."
62
                    .format(rid=rid, profile=self.id_))
63
                raise ValueError(msg)
64
65
        self.apply_filter(rules_by_id)
66
67
        self.resolved = True
68
69
70
class ProfileWithInlinePolicies(ResolvableProfile):
71
    def __init__(self, * args, ** kwargs):
72
        super(ProfileWithInlinePolicies, self).__init__(* args, ** kwargs)
73
        self.controls_by_policy = defaultdict(list)
74
75
    def apply_selection(self, item):
76
        # ":" is the delimiter for controls but not when the item is a variable
77
        if ":" in item and "=" not in item:
78
            policy_id, control_id = item.split(":", 1)
79
            self.controls_by_policy[policy_id].append(control_id)
80
        else:
81
            super(ProfileWithInlinePolicies, self).apply_selection(item)
82
83
    def _process_controls_ids_into_controls(self, controls_manager, policy_id, controls_ids):
84
        controls = []
85
        for cid in controls_ids:
86
            if not cid.startswith("all"):
87
                controls.extend(
88
                    self._controls_ids_to_controls(controls_manager, policy_id, [cid]))
89
            elif ":" in cid:
90
                _, level_id = cid.split(":", 1)
91
                controls.extend(
92
                    controls_manager.get_all_controls_of_level(policy_id, level_id))
93
            else:
94
                controls.extend(
95
                    controls_manager.get_all_controls(policy_id))
96
        return controls
97
98
    def resolve_controls(self, controls_manager):
99
        self.policies = list(self.controls_by_policy.keys())
100
        for policy_id, controls_ids in self.controls_by_policy.items():
101
            controls = self._process_controls_ids_into_controls(
102
                controls_manager, policy_id, controls_ids)
103
104
            for c in controls:
105
                self.update_with(c)
106