octoprint_auth_ldap.group_manager   B
last analyzed

Complexity

Total Complexity 46

Size/Duplication

Total Lines 259
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 208
dl 0
loc 259
rs 8.72
c 0
b 0
f 0
wmc 46

7 Methods

Rating   Name   Duplication   Size   Complexity  
A LDAPGroupManager._to_group_key() 0 3 1
C LDAPGroupManager.add_group() 0 65 11
A LDAPGroupManager.__init__() 0 4 1
C LDAPGroupManager._save() 0 40 9
A LDAPGroupManager.get_ldap_groups_for() 0 8 4
B LDAPGroupManager._refresh_ldap_groups() 0 41 6
F LDAPGroupManager._load() 0 69 14

How to fix   Complexity   

Complexity

Complex classes like octoprint_auth_ldap.group_manager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# coding=utf-8
2
from __future__ import absolute_import
3
4
import io
5
import os
6
import re
7
8
import yaml
9
from octoprint.access.groups import FilebasedGroupManager, Group, GroupAlreadyExists
10
from octoprint.access.permissions import Permissions, OctoPrintPermission
11
from octoprint.util import atomic_write
12
from octoprint_auth_ldap.constants import OU, OU_FILTER, DISTINGUISHED_NAME, LDAP_PARENT_GROUP_NAME, \
13
    LDAP_PARENT_GROUP_DESCRIPTION, LDAP_PARENT_GROUP_KEY, LDAP_GROUP_KEY_PREFIX
14
from octoprint_auth_ldap.group import LDAPGroup
15
from octoprint_auth_ldap.ldap import DependentOnLDAPConnection
16
from octoprint_auth_ldap.tweaks import DependentOnSettingsPlugin
17
from octoprint_auth_ldap.user import LDAPUser
18
19
20
class LDAPGroupManager(FilebasedGroupManager, DependentOnSettingsPlugin, DependentOnLDAPConnection):
21
22
    def __init__(self, plugin, ldap, path=None):
23
        DependentOnSettingsPlugin.__init__(self, plugin)
24
        DependentOnLDAPConnection.__init__(self, ldap)
25
        FilebasedGroupManager.__init__(self, path)
26
27
    def add_group(
28
        self,
29
        key,
30
        name,
31
        description,
32
        permissions,
33
        subgroups,
34
        default=False,
35
        removable=True,
36
        changeable=True,
37
        toggleable=True,
38
        overwrite=False,
39
        notify=True,
40
        save=True,
41
        dn=None
42
    ):
43
        if dn is None:
44
            FilebasedGroupManager.add_group(
45
                self,
46
                key=key,
47
                name=name,
48
                description=description,
49
                permissions=permissions,
50
                subgroups=subgroups,
51
                default=default,
52
                removable=False if key == LDAP_PARENT_GROUP_KEY else removable,
53
                changeable=True if key == LDAP_PARENT_GROUP_KEY else changeable,
54
                toggleable=toggleable,
55
                overwrite=overwrite,
56
                notify=notify,
57
                save=save
58
            )
59
        else:
60
            if key in self._groups and not overwrite:
61
                raise GroupAlreadyExists(key)
62
63
            if not permissions:
64
                permissions = []
65
66
            permissions = self._to_permissions(*permissions)
67
            assert (all(map(lambda p: isinstance(p, OctoPrintPermission), permissions)))
68
69
            subgroups = self._to_groups(*subgroups)
70
            assert (all(map(lambda g: isinstance(g, Group), subgroups)))
71
72
            group = LDAPGroup(
73
                key=key,
74
                name=name,
75
                description=description,
76
                permissions=permissions,
77
                subgroups=subgroups,
78
                default=default,
79
                changeable=True,
80
                removable=False,
81
                dn=dn
82
            )
83
            self._groups[key] = group
84
            self.logger.debug("Added group %s as %s" % (name, LDAPGroup.__name__))
85
86
            if save:
87
                self._dirty = True
88
                self._save()
89
90
            if notify:
91
                self._notify_listeners("added", group)
92
93
    def _to_group_key(self, ou_common_name):
94
        return "%s%s" % (
95
            self.settings.get([LDAP_GROUP_KEY_PREFIX]), re.sub(r"\W+", "_", ou_common_name.strip().lower()))
96
97
    def _refresh_ldap_groups(self):
98
        ou = self.settings.get([OU])
99
        if ou is not None or ou == "":  # FIXME allowing empty string settings is dumb
100
            self.logger.info("Syncing LDAP groups to local groups based on %s settings" % self.plugin.identifier)
101
102
            try:
103
                self.add_group(key=self.settings.get([LDAP_PARENT_GROUP_KEY]),
104
                               name=self.settings.get([LDAP_PARENT_GROUP_NAME]),
105
                               description=self.settings.get([LDAP_PARENT_GROUP_DESCRIPTION]),
106
                               permissions=[],
107
                               subgroups=[],
108
                               overwrite=False
109
                               )
110
            except GroupAlreadyExists:
111
                assert True
112
113
            organizational_units = [group.strip() for group in str(self.settings.get([OU])).split(",")]
114
            ldap_groups = [group.get_name() for group in self._groups.values() if isinstance(group, LDAPGroup)]
115
            ou_filter = self.settings.get([OU_FILTER])
116
117
            for ou_common_name in list(set(organizational_units) - set(ldap_groups)):
118
                key = self._to_group_key(ou_common_name)
119
                this_group = self.find_group(key)
120
                if this_group is None:
121
                    result = self.ldap.search("(" + ou_filter % ou_common_name.strip() + ")")
122
                    self.add_group(key=key,
123
                                   name=ou_common_name,
124
                                   dn=result[DISTINGUISHED_NAME],
125
                                   description="Synced LDAP Group",
126
                                   permissions=[],
127
                                   subgroups=[],
128
                                   toggleable=True,
129
                                   removable=False,
130
                                   changeable=True,
131
                                   save=False
132
                                   )
133
134
            self.update_group(
135
                self.settings.get([LDAP_PARENT_GROUP_KEY]),
136
                subgroups=[group for group in self._groups.values() if isinstance(group, LDAPGroup)],
137
                save=True
138
            )
139
140
    def get_ldap_groups_for(self, dn):
141
        if isinstance(dn, LDAPUser):
142
            dn = dn.distinguished_name
143
        self._refresh_ldap_groups()
144
        memberships = self.ldap.get_ou_memberships_for(dn)
145
        if memberships is False:
146
            return []
147
        return list(map(lambda g: self._to_group_key(g), memberships))
148
149
    def _load(self):
150
        if os.path.exists(self._groupfile) and os.path.isfile(self._groupfile):
151
            try:
152
                with io.open(self._groupfile, 'rt', encoding='utf-8') as f:
153
                    data = yaml.safe_load(f)
154
155
                if "groups" not in data:
156
                    groups = data
157
                    data = dict(groups=groups)
158
159
                groups = data.get("groups", dict())
160
                tracked_permissions = data.get("tracked", list())
161
162
                for key, attributes in groups.items():
163
                    if key in self._groups:
164
                        # group is already there (from the defaults most likely)
165
                        if not self._groups[key].is_changeable():
166
                            # group may not be changed -> bail
167
                            continue
168
169
                        removable = self._groups[key].is_removable()
170
                        changeable = self._groups[key].is_changeable()
171
                        toggleable = self._groups[key].is_toggleable()
172
                    else:
173
                        removable = True
174
                        changeable = True
175
                        toggleable = True
176
177
                    permissions = self._to_permissions(*attributes.get("permissions", []))
178
                    default_permissions = self.default_permissions_for_group(key)
179
                    for permission in default_permissions:
180
                        if permission.key not in tracked_permissions and permission not in permissions:
181
                            permissions.append(permission)
182
183
                    subgroups = attributes.get("subgroups", [])
184
185
                    group_type = attributes.get("type", False)
186
187
                    if group_type == LDAPGroup.GROUP_TYPE:
188
                        self.logger.debug("Loading group %s as %s" % (attributes.get("name", key), LDAPGroup.__name__))
189
                        group = LDAPGroup(
190
                            key,
191
                            attributes.get("name", key),
192
                            description=attributes.get("description", ""),
193
                            permissions=permissions,
194
                            subgroups=subgroups,
195
                            default=attributes.get("default", False),
196
                            removable=False,
197
                            changeable=changeable,
198
                            toggleable=toggleable,
199
                            dn=attributes.get(DISTINGUISHED_NAME, None)
200
                        )
201
                    else:
202
                        self.logger.debug("Loading group %s as %s" % (attributes.get("name", key), Group.__name__))
203
                        group = Group(key, attributes.get("name", ""),
204
                                      description=attributes.get("description", ""),
205
                                      permissions=permissions,
206
                                      subgroups=subgroups,
207
                                      default=attributes.get("default", False),
208
                                      removable=removable,
209
                                      changeable=changeable,
210
                                      toggleable=toggleable)
211
                    self._groups[key] = group
212
213
                for group in self._groups.values():
214
                    group._subgroups = self._to_groups(*group._subgroups)
215
216
            except Exception:
217
                self.logger.exception("Error while loading groups from file {}".format(self._groupfile))
218
219
    def _save(self, force=False):
220
        if self._groupfile is None or not self._dirty and not force:
221
            return
222
223
        groups = dict()
224
        for key, group in self._groups.items():
225
            if not group or not isinstance(group, Group):
226
                self.logger.debug('Not saving %s' % key)
227
                continue
228
229
            if isinstance(group, LDAPGroup):
230
                self.logger.debug("Saving group %s as %s" % (group.get_name(), LDAPGroup.__name__))
231
                groups[key] = dict(
232
                    type=LDAPGroup.GROUP_TYPE,
233
                    dn=group.distinguished_name,
234
235
                    name=group.get_name(),
236
                    description=group.get_description(),
237
                    permissions=self._from_permissions(*group.permissions),
238
                    subgroups=self._from_groups(*group.subgroups),
239
                    default=group.is_default()
240
                )
241
            else:
242
                self.logger.debug("Saving group %s as %s" % (group.get_name(), Group.__name__))
243
                groups[key] = dict(
244
                    name=group._name,
245
                    description=group._description,
246
                    permissions=self._from_permissions(*group._permissions),
247
                    subgroups=self._from_groups(*group._subgroups),
248
                    default=group._default
249
                )
250
251
        data = dict(groups=groups,
252
                    tracked=[x.key for x in Permissions.all()])
253
254
        with atomic_write(self._groupfile, mode='wt', permissions=0o600, max_permissions=0o666) as f:
255
            import yaml
256
            yaml.safe_dump(data, f, default_flow_style=False, indent=4, allow_unicode=True)
257
            self._dirty = False
258
        self._load()
259