octoprint_auth_ldap.user_manager   B
last analyzed

Complexity

Total Complexity 50

Size/Duplication

Total Lines 260
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 199
dl 0
loc 260
rs 8.4
c 0
b 0
f 0
wmc 50

11 Methods

Rating   Name   Duplication   Size   Complexity  
A LDAPUserManager.__init__() 0 4 1
A LDAPUserManager.group_manager() 0 3 1
B LDAPUserManager.add_user() 0 42 6
A LDAPUserManager._find_user_via_ldap() 0 31 4
A LDAPUserManager._find_user_with_transformation() 0 10 5
B LDAPUserManager.check_password() 0 19 6
A LDAPUserManager.find_user() 0 7 3
A LDAPUserManager.refresh_ldap_group_memberships_for() 0 7 2
A LDAPUserManager.refresh_ldap_group_memberships() 0 3 3
B LDAPUserManager._save() 0 48 8
C LDAPUserManager._load() 0 55 11

How to fix   Complexity   

Complexity

Complex classes like octoprint_auth_ldap.user_manager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# coding=utf-8
2
from __future__ import absolute_import
3
4
import io
5
import os
6
7
import yaml
8
from ldap.filter import filter_format
9
from octoprint.access.users import FilebasedUserManager, User, UserAlreadyExists
10
from octoprint.util import atomic_write
11
from octoprint_auth_ldap.constants import LOCAL_CACHE, SEARCH_FILTER, SEARCH_TERM_TRANSFORM, DISTINGUISHED_NAME, OU
12
from octoprint_auth_ldap.group import LDAPGroup
13
from octoprint_auth_ldap.group_manager import LDAPGroupManager
14
from octoprint_auth_ldap.ldap import DependentOnLDAPConnection
15
from octoprint_auth_ldap.tweaks import DependentOnSettingsPlugin
16
from octoprint_auth_ldap.user import LDAPUser
17
18
19
class LDAPUserManager(FilebasedUserManager, DependentOnSettingsPlugin, DependentOnLDAPConnection):
20
21
    def __init__(self, plugin, ldap, **kwargs):
22
        DependentOnSettingsPlugin.__init__(self, plugin)
23
        DependentOnLDAPConnection.__init__(self, ldap)
24
        FilebasedUserManager.__init__(self, group_manager=LDAPGroupManager(plugin=plugin, ldap=ldap), **kwargs)
25
26
    @property
27
    def group_manager(self):
28
        return self._group_manager
29
30
    def find_user(self, userid=None, apikey=None, session=None):
31
        self.logger.debug("Search for userid=%s, apiKey=%s, session=%s" % (userid, apikey, session))
32
        user = FilebasedUserManager.find_user(self, userid=userid, apikey=apikey, session=session)
33
        user, userid = self._find_user_with_transformation(apikey, session, user, userid)
34
        if not user and userid:
35
            user = self._find_user_via_ldap(user, userid)
36
        return user
37
38
    def _find_user_via_ldap(self, user, userid):
39
        self.logger.debug("User %s not found locally, treating as LDAP" % userid)
40
        search_filter = self.settings.get([SEARCH_FILTER])
41
        self.group_manager._refresh_ldap_groups()
42
        """
43
                operating on the wildly unsafe assumption that the admin who configures this plugin will have their head
44
                screwed on right and we are NOT escaping their search strings... only escaping unsafe user-entered text that
45
                is passed directly to search filters
46
                """
47
        ldap_user = self.ldap.search(filter_format(search_filter, (userid,)))
48
        if ldap_user is not None:
49
            self.logger.debug("User %s found as dn=%s" % (userid, ldap_user[DISTINGUISHED_NAME]))
50
            groups = self._group_manager.get_ldap_groups_for(ldap_user[DISTINGUISHED_NAME])
51
            if isinstance(groups, list):
52
                self.logger.debug("Creating new LDAPUser %s" % userid)
53
                if self.settings.get([LOCAL_CACHE]):
54
                    self.add_user(
55
                        username=userid,
56
                        dn=ldap_user[DISTINGUISHED_NAME],
57
                        groups=groups,
58
                        active=True
59
                    )
60
                    user = self._users[userid]
61
                else:
62
                    user = LDAPUser(
63
                        username=userid,
64
                        dn=ldap_user[DISTINGUISHED_NAME],
65
                        groups=groups,
66
                        active=True
67
                    )
68
        return user
69
70
    def _find_user_with_transformation(self, apikey, session, user, userid):
71
        transformation = self.settings.get([SEARCH_TERM_TRANSFORM])
72
        if not user and userid and transformation:
73
            self.logger.debug("Transforming %s using %s" % (userid, transformation))
74
            transformed = getattr(str, transformation)(str(userid))
75
            self.logger.debug("Search for user userid=%s" % transformed)
76
            if transformed != userid:
77
                userid = transformed
78
                user = FilebasedUserManager.find_user(self, userid=userid, apikey=apikey, session=session)
79
        return user, userid
80
81
    def add_user(self,
82
                 username,
83
                 password=None,
84
                 active=False,
85
                 permissions=None,
86
                 groups=None,
87
                 apikey=None,
88
                 overwrite=False,
89
                 dn=None):
90
        if dn is None:
91
            FilebasedUserManager.add_user(
92
                self,
93
                username=username,
94
                password=password,
95
                active=active,
96
                permissions=permissions,
97
                groups=groups,
98
                apikey=apikey,
99
                overwrite=overwrite
100
            )
101
        else:
102
            if username in self._users.keys() and not overwrite:
103
                raise UserAlreadyExists(username)
104
105
            if not permissions:
106
                permissions = []
107
            permissions = self._to_permissions(*permissions)
108
109
            if not groups:
110
                groups = self._group_manager.default_groups
111
            groups = self._to_groups(*groups)
112
113
            self._users[username] = LDAPUser(
114
                username=username,
115
                active=active,
116
                permissions=permissions,
117
                groups=groups,
118
                dn=dn,
119
                apikey=apikey
120
            )
121
            self._dirty = True
122
            self._save()
123
124
    def check_password(self, username, password):
125
        user = self.find_user(userid=username)
126
        if isinstance(user, LDAPUser):
127
            # in case group settings changed either in auth_ldap settings OR on LDAP directory
128
            if user.is_active and (
129
                self.settings.get([OU]) is None or
130
                len(self.refresh_ldap_group_memberships_for(user)) > 0
131
            ):
132
                self.logger.debug("Checking %s password via LDAP" % user.get_id())
133
                client = self.ldap.get_client(user.distinguished_name, password)
134
                authenticated = client is not None
135
                self.logger.debug("%s was %sauthenticated" % (user.get_name(), "" if authenticated else "not "))
136
                return authenticated
137
            else:
138
                self.logger.debug("%s is inactive or no longer a member of required groups" % user.get_id())
139
        else:
140
            self.logger.debug("Checking %s password via users.yaml" % user.get_name())
141
            return FilebasedUserManager.check_password(self, user.get_name(), password)
142
        return False
143
144
    def refresh_ldap_group_memberships_for(self, user):
145
        current_groups = self.group_manager.get_ldap_groups_for(user)
146
        cached_groups = list(filter(lambda g: isinstance(g, LDAPGroup), user.groups))
147
        self.remove_groups_from_user(user.get_id(), list(set(cached_groups) - set(current_groups)))
148
        self.add_groups_to_user(user.get_id(), list(set(current_groups) - set(cached_groups)))
149
        self.logger.debug("%s is currently a member of %s" % (user.get_id(), current_groups))
150
        return current_groups
151
152
    def refresh_ldap_group_memberships(self):
153
        for user in filter(lambda u: isinstance(u, LDAPUser), self.get_all_users()):
154
            self.refresh_ldap_group_memberships_for(user)
155
156
    def _load(self):
157
        if os.path.exists(self._userfile) and os.path.isfile(self._userfile):
158
            self._customized = True
159
            with io.open(self._userfile, 'rt', encoding='utf-8') as f:
160
                data = yaml.safe_load(f)
161
                for name, attributes in data.items():
162
                    permissions = self._to_permissions(*attributes.get("permissions", []))
163
                    groups = attributes.get("groups", {
164
                        self._group_manager.user_group  # the user group is mandatory for all logged in users
165
                    })
166
                    user_type = attributes.get("type", False)
167
168
                    # migrate from roles to permissions
169
                    if "roles" in attributes and "permissions" not in attributes:
170
                        self.logger.info("Migrating user %s to new granular permission system" % name)
171
                        groups |= set(self._migrate_roles_to_groups(attributes["roles"]))
172
                        self._dirty = True
173
174
                    # because this plugin used to use the groups field, need to wait to make sure it's safe to do this
175
                    groups = self._to_groups(*groups)
176
177
                    apikey = attributes.get("apikey")
178
                    user_settings = attributes.get("settings", dict())
179
180
                    if user_type == LDAPUser.USER_TYPE:
181
                        self.logger.debug("Loading %s as %s" % (name, LDAPUser.__name__))
182
                        self._users[name] = LDAPUser(
183
                            username=name,
184
                            active=attributes["active"],
185
                            permissions=permissions,
186
                            groups=groups,
187
                            dn=attributes[DISTINGUISHED_NAME],
188
                            apikey=apikey,
189
                            settings=user_settings
190
                        )
191
                    else:
192
                        self.logger.debug("Loading %s as %s" % (name, User.__name__))
193
                        self._users[name] = User(
194
                            username=name,
195
                            passwordHash=attributes["password"],
196
                            active=attributes["active"],
197
                            permissions=permissions,
198
                            groups=groups,
199
                            apikey=apikey,
200
                            settings=user_settings
201
                        )
202
                        for session_id in self._sessionids_by_userid.get(name, set()):
203
                            if session_id in self._session_users_by_session:
204
                                self._session_users_by_session[session_id].update_user(self._users[name])
205
206
            if self._dirty:
207
                self._save()
208
209
        else:
210
            self._customized = False
211
212
    def _save(self, force=False):
213
        if not self._dirty and not force:
214
            return
215
216
        data = {}
217
        for name, user in self._users.items():
218
            if not user or not isinstance(user, User):
219
                self.logger.debug('Not saving %s' % name)
220
                continue
221
222
            if isinstance(user, LDAPUser):
223
                self.logger.debug('Saving %s as %s' % (name, LDAPUser.__name__))
224
                data[name] = {
225
                    "type": LDAPUser.USER_TYPE,
226
                    DISTINGUISHED_NAME: user.distinguished_name,
227
228
                    # password field has to exist because of how FilebasedUserManager processes
229
                    # data, but an empty password hash cannot match any entered password (as
230
                    # whatever the user enters will be hashed... even an empty password.
231
                    "password": None,
232
233
                    "active": user._active,
234
                    "groups": self._from_groups(*user._groups),
235
                    "permissions": self._from_permissions(*user._permissions),
236
                    "apikey": user._apikey,
237
                    "settings": user._settings,
238
239
                    # TODO: deprecated, remove in 1.5.0
240
                    "roles": user._roles
241
                }
242
            else:
243
                self.logger.debug('Saving %s as %s...' % (name, User.__name__))
244
                data[name] = {
245
                    "password": user._passwordHash,
246
                    "active": user._active,
247
                    "groups": self._from_groups(*user._groups),
248
                    "permissions": self._from_permissions(*user._permissions),
249
                    "apikey": user._apikey,
250
                    "settings": user._settings,
251
252
                    # TODO: deprecated, remove in 1.5.0
253
                    "roles": user._roles
254
                }
255
256
        with atomic_write(self._userfile, mode='wt', permissions=0o600, max_permissions=0o666) as f:
257
            yaml.safe_dump(data, f, default_flow_style=False, indent=4, allow_unicode=True)
258
            self._dirty = False
259
        self._load()
260