Passed
Push — master ( 273471...ca35ef )
by Seth
01:24
created

octoprint_auth_ldap.user_manager   B

Complexity

Total Complexity 52

Size/Duplication

Total Lines 266
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 204
dl 0
loc 266
rs 7.44
c 0
b 0
f 0
wmc 52

12 Methods

Rating   Name   Duplication   Size   Complexity  
A LDAPUserManager.__init__() 0 4 1
A LDAPUserManager.group_manager() 0 3 1
B LDAPUserManager.add_user() 0 42 6
A LDAPUserManager._find_user_via_ldap() 0 31 4
B LDAPUserManager._save() 0 48 8
A LDAPUserManager._find_user_with_transformation() 0 10 5
B LDAPUserManager.check_password() 0 19 6
A LDAPUserManager.find_user() 0 7 3
A LDAPUserManager.refresh_ldap_group_memberships_for() 0 7 2
C LDAPUserManager._load() 0 54 11
A LDAPUserManager.refresh_ldap_group_memberships() 0 3 3
A LDAPUserManager._default_load() 0 6 2

How to fix   Complexity   

Complexity

Complex classes like octoprint_auth_ldap.user_manager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# coding=utf-8
2
from __future__ import absolute_import
3
4
import io
5
import os
6
7
import yaml
8
from ldap.filter import filter_format
9
from octoprint.access.users import FilebasedUserManager, User, UserAlreadyExists
10
from octoprint.util import atomic_write
11
from octoprint_auth_ldap.constants import LOCAL_CACHE, SEARCH_FILTER, SEARCH_TERM_TRANSFORM, DISTINGUISHED_NAME, OU
12
from octoprint_auth_ldap.group import LDAPGroup
13
from octoprint_auth_ldap.group_manager import LDAPGroupManager
14
from octoprint_auth_ldap.ldap import LDAPConnection, DependentOnLDAPConnection
15
from octoprint_auth_ldap.tweaks import DependentOnSettingsPlugin
16
from octoprint_auth_ldap.user import LDAPUser
17
18
19
class LDAPUserManager(FilebasedUserManager, DependentOnSettingsPlugin, DependentOnLDAPConnection):
20
21
    def __init__(self, plugin, ldap: LDAPConnection, **kwargs):
22
        DependentOnSettingsPlugin.__init__(self, plugin)
23
        DependentOnLDAPConnection.__init__(self, ldap)
24
        FilebasedUserManager.__init__(self, group_manager=LDAPGroupManager(plugin=plugin, ldap=ldap), **kwargs)
25
26
    @property
27
    def group_manager(self) -> LDAPGroupManager:
28
        return self._group_manager
29
30
    def find_user(self, userid=None, apikey=None, session=None):
31
        self.logger.debug("Search for userid=%s, apiKey=%s, session=%s" % (userid, apikey, session))
32
        user = FilebasedUserManager.find_user(self, userid=userid, apikey=apikey, session=session)
33
        user, userid = self._find_user_with_transformation(apikey, session, user, userid)
34
        if not user and userid:
35
            user = self._find_user_via_ldap(user, userid)
36
        return user
37
38
    def _find_user_via_ldap(self, user, userid):
39
        self.logger.debug("User %s not found locally, treating as LDAP" % userid)
40
        search_filter = self.settings.get([SEARCH_FILTER])
41
        self.group_manager._refresh_ldap_groups()
42
        """
43
                operating on the wildly unsafe assumption that the admin who configures this plugin will have their head
44
                screwed on right and we are NOT escaping their search strings... only escaping unsafe user-entered text that
45
                is passed directly to search filters
46
                """
47
        ldap_user = self.ldap.search(filter_format(search_filter, (userid,)))
48
        if ldap_user is not None:
49
            self.logger.debug("User %s found as dn=%s" % (userid, ldap_user[DISTINGUISHED_NAME]))
50
            groups = self._group_manager.get_ldap_groups_for(ldap_user[DISTINGUISHED_NAME])
51
            if isinstance(groups, list):
52
                self.logger.debug("Creating new LDAPUser %s" % userid)
53
                if self.settings.get([LOCAL_CACHE]):
54
                    self.add_user(
55
                        username=userid,
56
                        dn=ldap_user[DISTINGUISHED_NAME],
57
                        groups=groups,
58
                        active=True
59
                    )
60
                    user = self._users[userid]
61
                else:
62
                    user = LDAPUser(
63
                        username=userid,
64
                        dn=ldap_user[DISTINGUISHED_NAME],
65
                        groups=groups,
66
                        active=True
67
                    )
68
        return user
69
70
    def _find_user_with_transformation(self, apikey, session, user, userid):
71
        transformation = self.settings.get([SEARCH_TERM_TRANSFORM])
72
        if not user and userid and transformation:
73
            self.logger.debug("Transforming %s using %s" % (userid, transformation))
74
            transformed = getattr(str, transformation)(str(userid))
75
            self.logger.debug("Search for user userid=%s" % transformed)
76
            if transformed != userid:
77
                userid = transformed
78
                user = FilebasedUserManager.find_user(self, userid=userid, apikey=apikey, session=session)
79
        return user, userid
80
81
    def add_user(self,
82
                 username,
83
                 password=None,
84
                 active=False,
85
                 permissions=None,
86
                 groups=None,
87
                 apikey=None,
88
                 overwrite=False,
89
                 dn=None):
90
        if dn is None:
91
            FilebasedUserManager.add_user(
92
                self,
93
                username=username,
94
                password=password,
95
                active=active,
96
                permissions=permissions,
97
                groups=groups,
98
                apikey=apikey,
99
                overwrite=overwrite
100
            )
101
        else:
102
            if username in self._users.keys() and not overwrite:
103
                raise UserAlreadyExists(username)
104
105
            if not permissions:
106
                permissions = []
107
            permissions = self._to_permissions(*permissions)
108
109
            if not groups:
110
                groups = self._group_manager.default_groups
111
            groups = self._to_groups(*groups)
112
113
            self._users[username] = LDAPUser(
114
                username=username,
115
                active=active,
116
                permissions=permissions,
117
                groups=groups,
118
                dn=dn,
119
                apikey=apikey
120
            )
121
            self._dirty = True
122
            self._save()
123
124
    def check_password(self, username, password):
125
        user = self.find_user(userid=username)
126
        if isinstance(user, LDAPUser):
127
            # in case group settings changed either in auth_ldap settings OR on LDAP directory
128
            if user.is_active and (
129
                    self.settings.get([OU]) is None or
130
                    len(self.refresh_ldap_group_memberships_for(user)) > 0
131
            ):
132
                self.logger.debug("Checking %s password via LDAP" % user.get_id())
133
                client = self.ldap.get_client(user.distinguished_name, password)
134
                authenticated = client is not None
135
                self.logger.debug("%s was %sauthenticated" % (user.get_name(), "" if authenticated else "not "))
136
                return authenticated
137
            else:
138
                self.logger.debug("%s is inactive or no longer a member of required groups" % user.get_id())
139
        else:
140
            self.logger.debug("Checking %s password via users.yaml" % user.get_name())
141
            return FilebasedUserManager.check_password(self, user.get_name(), password)
142
        return False
143
144
    def refresh_ldap_group_memberships_for(self, user):
145
        current_groups = self.group_manager.get_ldap_groups_for(user)
146
        cached_groups = list(filter(lambda g: isinstance(g, LDAPGroup), user.groups))
147
        self.remove_groups_from_user(user.get_id(), list(set(cached_groups) - set(current_groups)))
148
        self.add_groups_to_user(user.get_id(), list(set(current_groups) - set(cached_groups)))
149
        self.logger.debug("%s is currently a member of %s" % (user.get_id(), current_groups))
150
        return current_groups
151
152
    def refresh_ldap_group_memberships(self):
153
        for user in filter(lambda u: isinstance(u, LDAPUser), self.get_all_users()):
154
            self.refresh_ldap_group_memberships_for(user)
155
156
    @staticmethod
157
    def _default_load(attributes, key, default=None):
158
        if key in attributes:
159
            return attributes[key]
160
        else:
161
            return default
162
163
    def _load(self):
164
        if os.path.exists(self._userfile) and os.path.isfile(self._userfile):
165
            self._customized = True
166
            with io.open(self._userfile, 'rt', encoding='utf-8') as f:
167
                data = yaml.safe_load(f)
168
                for name, attributes in data.items():
169
                    permissions = self._to_permissions(self._default_load(attributes, "permissions", []))
170
                    groups = self._default_load(attributes, "groups", {
171
                        self._group_manager.user_group})  # the user group is mandatory for all logged in users
172
                    user_type = self._default_load(attributes, "type", False)
173
174
                    # migrate from roles to permissions
175
                    if "roles" in attributes and "permissions" not in attributes:
176
                        self.logger.info("Migrating user %s to new granular permission system" % name)
177
                        groups |= set(self._migrate_roles_to_groups(attributes["roles"]))
178
                        self._dirty = True
179
180
                    # because this plugin used to use the groups field, need to wait to make sure it's safe to do this
181
                    groups = self._to_groups(*groups)
182
183
                    apikey = self._default_load(attributes, "apikey")
184
                    user_settings = self._default_load(attributes, "settings", dict())
185
186
                    if user_type == LDAPUser.USER_TYPE:
187
                        self.logger.debug("Loading %s as %s" % (name, LDAPUser.__name__))
188
                        self._users[name] = LDAPUser(
189
                            username=name,
190
                            active=attributes["active"],
191
                            permissions=permissions,
192
                            groups=groups,
193
                            dn=attributes[DISTINGUISHED_NAME],
194
                            apikey=apikey,
195
                            settings=user_settings
196
                        )
197
                    else:
198
                        self.logger.debug("Loading %s as %s" % (name, User.__name__))
199
                        self._users[name] = User(
200
                            username=name,
201
                            passwordHash=attributes["password"],
202
                            active=attributes["active"],
203
                            permissions=permissions,
204
                            groups=groups,
205
                            apikey=apikey,
206
                            settings=user_settings
207
                        )
208
                        for session_id in self._sessionids_by_userid.get(name, set()):
209
                            if session_id in self._session_users_by_session:
210
                                self._session_users_by_session[session_id].update_user(self._users[name])
211
212
            if self._dirty:
213
                self._save()
214
215
        else:
216
            self._customized = False
217
218
    def _save(self, force=False):
219
        if not self._dirty and not force:
220
            return
221
222
        data = {}
223
        for name, user in self._users.items():
224
            if not user or not isinstance(user, User):
225
                self.logger.debug('Not saving %s' % name)
226
                continue
227
228
            if isinstance(user, LDAPUser):
229
                self.logger.debug('Saving %s as %s' % (name, LDAPUser.__name__))
230
                data[name] = {
231
                    "type": LDAPUser.USER_TYPE,
232
                    DISTINGUISHED_NAME: user.distinguished_name,
233
234
                    # password field has to exist because of how FilebasedUserManager processes
235
                    # data, but an empty password hash cannot match any entered password (as
236
                    # whatever the user enters will be hashed... even an empty password.
237
                    "password": None,
238
239
                    "active": user._active,
240
                    "groups": self._from_groups(*user._groups),
241
                    "permissions": self._from_permissions(*user._permissions),
242
                    "apikey": user._apikey,
243
                    "settings": user._settings,
244
245
                    # TODO: deprecated, remove in 1.5.0
246
                    "roles": user._roles
247
                }
248
            else:
249
                self.logger.debug('Saving %s as %s...' % (name, User.__name__))
250
                data[name] = {
251
                    "password": user._passwordHash,
252
                    "active": user._active,
253
                    "groups": self._from_groups(*user._groups),
254
                    "permissions": self._from_permissions(*user._permissions),
255
                    "apikey": user._apikey,
256
                    "settings": user._settings,
257
258
                    # TODO: deprecated, remove in 1.5.0
259
                    "roles": user._roles
260
                }
261
262
        with atomic_write(self._userfile, mode='wt', permissions=0o600, max_permissions=0o666) as f:
263
            yaml.safe_dump(data, f, default_flow_style=False, indent=4, allow_unicode=True)
264
            self._dirty = False
265
        self._load()
266