LDAPGroupManager.add_group()   C
last analyzed

Complexity

Conditions 11

Size

Total Lines 65
Code Lines 55

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 11
eloc 55
nop 14
dl 0
loc 65
rs 5.2527
c 0
b 0
f 0

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like octoprint_auth_ldap.group_manager.LDAPGroupManager.add_group() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
# coding=utf-8
2
from __future__ import absolute_import
3
4
import io
5
import os
6
import re
7
8
import yaml
9
from octoprint.access.groups import FilebasedGroupManager, Group, GroupAlreadyExists
10
from octoprint.access.permissions import Permissions, OctoPrintPermission
11
from octoprint.util import atomic_write
12
from octoprint_auth_ldap.constants import OU, OU_FILTER, DISTINGUISHED_NAME, LDAP_PARENT_GROUP_NAME, \
13
    LDAP_PARENT_GROUP_DESCRIPTION, LDAP_PARENT_GROUP_KEY, LDAP_GROUP_KEY_PREFIX
14
from octoprint_auth_ldap.group import LDAPGroup
15
from octoprint_auth_ldap.ldap import DependentOnLDAPConnection
16
from octoprint_auth_ldap.tweaks import DependentOnSettingsPlugin
17
from octoprint_auth_ldap.user import LDAPUser
18
19
20
class LDAPGroupManager(FilebasedGroupManager, DependentOnSettingsPlugin, DependentOnLDAPConnection):
21
22
    def __init__(self, plugin, ldap, path=None):
23
        DependentOnSettingsPlugin.__init__(self, plugin)
24
        DependentOnLDAPConnection.__init__(self, ldap)
25
        FilebasedGroupManager.__init__(self, path)
26
27
    def add_group(
28
        self,
29
        key,
30
        name,
31
        description,
32
        permissions,
33
        subgroups,
34
        default=False,
35
        removable=True,
36
        changeable=True,
37
        toggleable=True,
38
        overwrite=False,
39
        notify=True,
40
        save=True,
41
        dn=None
42
    ):
43
        if dn is None:
44
            FilebasedGroupManager.add_group(
45
                self,
46
                key=key,
47
                name=name,
48
                description=description,
49
                permissions=permissions,
50
                subgroups=subgroups,
51
                default=default,
52
                removable=False if key == LDAP_PARENT_GROUP_KEY else removable,
53
                changeable=True if key == LDAP_PARENT_GROUP_KEY else changeable,
54
                toggleable=toggleable,
55
                overwrite=overwrite,
56
                notify=notify,
57
                save=save
58
            )
59
        else:
60
            if key in self._groups and not overwrite:
61
                raise GroupAlreadyExists(key)
62
63
            if not permissions:
64
                permissions = []
65
66
            permissions = self._to_permissions(*permissions)
67
            assert (all(map(lambda p: isinstance(p, OctoPrintPermission), permissions)))
68
69
            subgroups = self._to_groups(*subgroups)
70
            assert (all(map(lambda g: isinstance(g, Group), subgroups)))
71
72
            group = LDAPGroup(
73
                key=key,
74
                name=name,
75
                description=description,
76
                permissions=permissions,
77
                subgroups=subgroups,
78
                default=default,
79
                changeable=True,
80
                removable=False,
81
                dn=dn
82
            )
83
            self._groups[key] = group
84
            self.logger.debug("Added group %s as %s" % (name, LDAPGroup.__name__))
85
86
            if save:
87
                self._dirty = True
88
                self._save()
89
90
            if notify:
91
                self._notify_listeners("added", group)
92
93
    def _to_group_key(self, ou_common_name):
94
        return "%s%s" % (
95
            self.settings.get([LDAP_GROUP_KEY_PREFIX]), re.sub(r"\W+", "_", ou_common_name.strip().lower()))
96
97
    def _refresh_ldap_groups(self):
98
        ou = self.settings.get([OU])
99
        if ou is not None or ou == "":  # FIXME allowing empty string settings is dumb
100
            self.logger.info("Syncing LDAP groups to local groups based on %s settings" % self.plugin.identifier)
101
102
            try:
103
                self.add_group(key=self.settings.get([LDAP_PARENT_GROUP_KEY]),
104
                               name=self.settings.get([LDAP_PARENT_GROUP_NAME]),
105
                               description=self.settings.get([LDAP_PARENT_GROUP_DESCRIPTION]),
106
                               permissions=[],
107
                               subgroups=[],
108
                               overwrite=False
109
                               )
110
            except GroupAlreadyExists:
111
                assert True
112
113
            organizational_units = [group.strip() for group in str(self.settings.get([OU])).split(",")]
114
            ldap_groups = [group.get_name() for group in self._groups.values() if isinstance(group, LDAPGroup)]
115
            ou_filter = self.settings.get([OU_FILTER])
116
117
            for ou_common_name in list(set(organizational_units) - set(ldap_groups)):
118
                key = self._to_group_key(ou_common_name)
119
                this_group = self.find_group(key)
120
                if this_group is None:
121
                    result = self.ldap.search("(" + ou_filter % ou_common_name.strip() + ")")
122
                    self.add_group(key=key,
123
                                   name=ou_common_name,
124
                                   dn=result[DISTINGUISHED_NAME],
125
                                   description="Synced LDAP Group",
126
                                   permissions=[],
127
                                   subgroups=[],
128
                                   toggleable=True,
129
                                   removable=False,
130
                                   changeable=True,
131
                                   save=False
132
                                   )
133
134
            self.update_group(
135
                self.settings.get([LDAP_PARENT_GROUP_KEY]),
136
                subgroups=[group for group in self._groups.values() if isinstance(group, LDAPGroup)],
137
                save=True
138
            )
139
140
    def get_ldap_groups_for(self, dn):
141
        if isinstance(dn, LDAPUser):
142
            dn = dn.distinguished_name
143
        self._refresh_ldap_groups()
144
        memberships = self.ldap.get_ou_memberships_for(dn)
145
        if memberships is False:
146
            return []
147
        return list(map(lambda g: self._to_group_key(g), memberships))
148
149
    def _load(self):
150
        if os.path.exists(self._groupfile) and os.path.isfile(self._groupfile):
151
            try:
152
                with io.open(self._groupfile, 'rt', encoding='utf-8') as f:
153
                    data = yaml.safe_load(f)
154
155
                if "groups" not in data:
156
                    groups = data
157
                    data = dict(groups=groups)
158
159
                groups = data.get("groups", dict())
160
                tracked_permissions = data.get("tracked", list())
161
162
                for key, attributes in groups.items():
163
                    if key in self._groups:
164
                        # group is already there (from the defaults most likely)
165
                        if not self._groups[key].is_changeable():
166
                            # group may not be changed -> bail
167
                            continue
168
169
                        removable = self._groups[key].is_removable()
170
                        changeable = self._groups[key].is_changeable()
171
                        toggleable = self._groups[key].is_toggleable()
172
                    else:
173
                        removable = True
174
                        changeable = True
175
                        toggleable = True
176
177
                    permissions = self._to_permissions(*attributes.get("permissions", []))
178
                    default_permissions = self.default_permissions_for_group(key)
179
                    for permission in default_permissions:
180
                        if permission.key not in tracked_permissions and permission not in permissions:
181
                            permissions.append(permission)
182
183
                    subgroups = attributes.get("subgroups", [])
184
185
                    group_type = attributes.get("type", False)
186
187
                    if group_type == LDAPGroup.GROUP_TYPE:
188
                        self.logger.debug("Loading group %s as %s" % (attributes.get("name", key), LDAPGroup.__name__))
189
                        group = LDAPGroup(
190
                            key,
191
                            attributes.get("name", key),
192
                            description=attributes.get("description", ""),
193
                            permissions=permissions,
194
                            subgroups=subgroups,
195
                            default=attributes.get("default", False),
196
                            removable=False,
197
                            changeable=changeable,
198
                            toggleable=toggleable,
199
                            dn=attributes.get(DISTINGUISHED_NAME, None)
200
                        )
201
                    else:
202
                        self.logger.debug("Loading group %s as %s" % (attributes.get("name", key), Group.__name__))
203
                        group = Group(key, attributes.get("name", ""),
204
                                      description=attributes.get("description", ""),
205
                                      permissions=permissions,
206
                                      subgroups=subgroups,
207
                                      default=attributes.get("default", False),
208
                                      removable=removable,
209
                                      changeable=changeable,
210
                                      toggleable=toggleable)
211
                    self._groups[key] = group
212
213
                for group in self._groups.values():
214
                    group._subgroups = self._to_groups(*group._subgroups)
215
216
            except Exception:
217
                self.logger.exception("Error while loading groups from file {}".format(self._groupfile))
218
219
    def _save(self, force=False):
220
        if self._groupfile is None or not self._dirty and not force:
221
            return
222
223
        groups = dict()
224
        for key, group in self._groups.items():
225
            if not group or not isinstance(group, Group):
226
                self.logger.debug('Not saving %s' % key)
227
                continue
228
229
            if isinstance(group, LDAPGroup):
230
                self.logger.debug("Saving group %s as %s" % (group.get_name(), LDAPGroup.__name__))
231
                groups[key] = dict(
232
                    type=LDAPGroup.GROUP_TYPE,
233
                    dn=group.distinguished_name,
234
235
                    name=group.get_name(),
236
                    description=group.get_description(),
237
                    permissions=self._from_permissions(*group.permissions),
238
                    subgroups=self._from_groups(*group.subgroups),
239
                    default=group.is_default()
240
                )
241
            else:
242
                self.logger.debug("Saving group %s as %s" % (group.get_name(), Group.__name__))
243
                groups[key] = dict(
244
                    name=group._name,
245
                    description=group._description,
246
                    permissions=self._from_permissions(*group._permissions),
247
                    subgroups=self._from_groups(*group._subgroups),
248
                    default=group._default
249
                )
250
251
        data = dict(groups=groups,
252
                    tracked=[x.key for x in Permissions.all()])
253
254
        with atomic_write(self._groupfile, mode='wt', permissions=0o600, max_permissions=0o666) as f:
255
            import yaml
256
            yaml.safe_dump(data, f, default_flow_style=False, indent=4, allow_unicode=True)
257
            self._dirty = False
258
        self._load()
259