octoprint_auth_ldap.ldap   A
last analyzed

Complexity

Total Complexity 19

Size/Duplication

Total Lines 94
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 70
dl 0
loc 94
rs 10
c 0
b 0
f 0
wmc 19

6 Methods

Rating   Name   Duplication   Size   Complexity  
A LDAPConnection.get_ou_memberships_for() 0 18 4
A DependentOnLDAPConnection.__init__() 0 2 1
A LDAPConnection.search() 0 21 5
B LDAPConnection.get_client() 0 27 7
A LDAPConnection.__init__() 0 2 1
A DependentOnLDAPConnection.ldap() 0 3 1
1
# coding=utf-8
2
from __future__ import absolute_import
3
4
import json
5
6
import ldap
7
from octoprint_auth_ldap.constants import AUTH_PASSWORD, AUTH_USER, DISTINGUISHED_NAME, OU, OU_FILTER, OU_MEMBER_FILTER, \
8
    REQUEST_TLS_CERT, SEARCH_BASE, URI
9
from octoprint_auth_ldap.tweaks import DependentOnSettingsPlugin
10
11
12
class LDAPConnection(DependentOnSettingsPlugin):
13
    def __init__(self, plugin):
14
        DependentOnSettingsPlugin.__init__(self, plugin)
15
16
    def get_client(self, user=None, password=None):
17
        uri = self.settings.get([URI])
18
        if not uri:
19
            self.logger.debug("No LDAP URI")
20
            return None
21
22
        if not user:
23
            user = self.settings.get([AUTH_USER])
24
            password = self.settings.get([AUTH_PASSWORD])
25
26
        try:
27
            self.logger.debug("Initializing LDAP connection to %s" % uri)
28
            client = ldap.initialize(uri)
29
            if self.settings.get([REQUEST_TLS_CERT]):
30
                self.logger.debug("Requesting TLS certificate")
31
                client.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)
32
            else:
33
                client.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
34
            if user is not None:
35
                self.logger.debug("Binding to LDAP as %s" % user)
36
                client.bind_s(user, password)
37
            return client
38
        except ldap.INVALID_CREDENTIALS:
39
            self.logger.error("Invalid credentials to bind to LDAP as %s" % user)
40
        except ldap.LDAPError as e:
41
            self.logger.error(json.dumps(e))
42
        return None
43
44
    def search(self, ldap_filter, base=None, scope=ldap.SCOPE_SUBTREE):
45
        if not base:
46
            base = self.settings.get([SEARCH_BASE])
47
        try:
48
            client = self.get_client()
49
            if client is not None:
50
                # self.logger.debug("Searching LDAP, base: %s and filter: %s" % (base, ldap_filter))
51
                result = client.search_s(base, scope, ldap_filter)
52
                client.unbind_s()
53
                if result:
54
                    dn, data = result[0]
55
                    """
56
                    # Dump LDAP search query results to logger
57
                    self.logger.debug("dn: %s" % dn)
58
                    for key, value in data.items():
59
                        self.logger.debug("%s: %s" % (key, value))
60
                    """
61
                    return dict(dn=dn, data=data)
62
        except ldap.LDAPError as e:
63
            self.logger.error(json.dumps(e))
64
        return None
65
66
    def get_ou_memberships_for(self, dn):
67
        memberships = []
68
69
        ou_common_names = self.settings.get([OU])
70
        if ou_common_names is None:
71
            return False
72
73
        ou_filter = self.settings.get([OU_FILTER])
74
        ou_member_filter = self.settings.get([OU_MEMBER_FILTER])
75
        for ou_common_name in str(ou_common_names).split(","):
76
            result = self.search("(&" +
77
                                 "(" + ou_filter % ou_common_name.strip() + ")" +
78
                                 "(" + (ou_member_filter % dn) + ")" +
79
                                 ")")
80
            if result is not None:
81
                self.logger.debug("%s is a member of %s" % (dn, result[DISTINGUISHED_NAME]))
82
                memberships.append(ou_common_name)
83
        return memberships
84
85
86
class DependentOnLDAPConnection:
87
    # noinspection PyShadowingNames
88
    def __init__(self, ldap):
89
        self._ldap = ldap
90
91
    @property
92
    def ldap(self):
93
        return self._ldap
94