Test Failed
Pull Request — master (#11135)
by Jan
02:45
created

ssg.entities.profile   A

Complexity

Total Complexity 25

Size/Duplication

Total Lines 107
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
eloc 79
dl 0
loc 107
ccs 0
cts 68
cp 0
rs 10
c 0
b 0
f 0
wmc 25

10 Methods

Rating   Name   Duplication   Size   Complexity  
A ResolvableProfile.resolve_controls() 0 2 1
A ResolvableProfile.__init__() 0 3 1
A ResolvableProfile.extend_by() 0 2 1
A ResolvableProfile._controls_ids_to_controls() 0 3 1
A ProfileWithInlinePolicies.apply_selection() 0 7 3
A ProfileWithInlinePolicies.resolve_controls() 0 8 3
A ProfileWithInlinePolicies.__init__() 0 3 1
A ResolvableProfile.apply_filter() 0 8 3
A ProfileWithInlinePolicies._process_controls_ids_into_controls() 0 14 4
B ResolvableProfile.resolve() 0 39 7
1
from collections import defaultdict
2
3
from .profile_base import Profile
4
5
6
class ResolvableProfile(Profile):
7
    def __init__(self, * args, ** kwargs):
8
        super(ResolvableProfile, self).__init__(* args, ** kwargs)
9
        self.resolved = False
10
11
    def _controls_ids_to_controls(self, controls_manager, policy_id, control_id_list):
12
        items = [controls_manager.get_control(policy_id, cid) for cid in control_id_list]
13
        return items
14
15
    def resolve_controls(self, controls_manager):
16
        pass
17
18
    def extend_by(self, extended_profile):
19
        self.update_with(extended_profile)
20
21
    def apply_filter(self, rules_by_id):
22
        selections = set()
23
        for rid in self.selected:
24
            rule = rules_by_id[rid]
25
            if not self.rule_filter(rule):
26
                continue
27
            selections.add(rid)
28
        self.selected = list(selections)
29
30
    def resolve(self, all_profiles, rules_by_id, controls_manager=None):
31
        if self.resolved:
32
            return
33
34
        if controls_manager:
35
            self.resolve_controls(controls_manager)
36
37
38
        if self.extends:
39
            if self.extends not in all_profiles:
40
                msg = (
41
                    "Profile {name} extends profile {extended}, but "
42
                    "only profiles {known_profiles} are available for resolution."
43
                    .format(name=self.id_, extended=self.extends,
44
                            known_profiles=list(all_profiles.keys())))
45
                raise RuntimeError(msg)
46
            extended_profile = all_profiles[self.extends]
47
            extended_profile.resolve(all_profiles, rules_by_id, controls_manager)
48
49
            self.extend_by(extended_profile)
50
51
        self.selected = [s for s in set(self.selected) if s not in self.unselected]
52
53
        self.unselected = []
54
        self.extends = None
55
56
        self.selected = sorted(self.selected)
57
58
        for rid in self.selected:
59
            if rid not in rules_by_id:
60
                msg = (
61
                    "Rule {rid} is selected by {profile}, but the rule is not available. "
62
                    "This may be caused by a discrepancy of prodtypes."
63
                    .format(rid=rid, profile=self.id_))
64
                raise ValueError(msg)
65
66
        self.apply_filter(rules_by_id)
67
68
        self.resolved = True
69
70
71
class ProfileWithInlinePolicies(ResolvableProfile):
72
    def __init__(self, * args, ** kwargs):
73
        super(ProfileWithInlinePolicies, self).__init__(* args, ** kwargs)
74
        self.controls_by_policy = defaultdict(list)
75
76
    def apply_selection(self, item):
77
        # ":" is the delimiter for controls but not when the item is a variable
78
        if ":" in item and "=" not in item:
79
            policy_id, control_id = item.split(":", 1)
80
            self.controls_by_policy[policy_id].append(control_id)
81
        else:
82
            super(ProfileWithInlinePolicies, self).apply_selection(item)
83
84
    def _process_controls_ids_into_controls(self, controls_manager, policy_id, controls_ids):
85
        controls = []
86
        for cid in controls_ids:
87
            if not cid.startswith("all"):
88
                controls.extend(
89
                    self._controls_ids_to_controls(controls_manager, policy_id, [cid]))
90
            elif ":" in cid:
91
                _, level_id = cid.split(":", 1)
92
                controls.extend(
93
                    controls_manager.get_all_controls_of_level(policy_id, level_id))
94
            else:
95
                controls.extend(
96
                    controls_manager.get_all_controls(policy_id))
97
        return controls
98
99
    def resolve_controls(self, controls_manager):
100
        self.policies = list(self.controls_by_policy.keys())
101
        for policy_id, controls_ids in self.controls_by_policy.items():
102
            controls = self._process_controls_ids_into_controls(
103
                controls_manager, policy_id, controls_ids)
104
105
            for c in controls:
106
                self.update_with(c)
107