Completed
Push — master ( 2f10ce...f02378 )
by Pat
01:02
created

chezbetty.models.User   A

Complexity

Total Complexity 31

Size/Duplication

Total Lines 161
Duplicated Lines 0 %
Metric Value
dl 0
loc 161
rs 9.8
wmc 31

21 Methods

Rating   Name   Duplication   Size   Complexity  
A get_user_count_cumulative() 0 6 1
A __str__() 0 3 1
A password() 0 9 1
A __init__() 0 6 1
A get_admins() 0 5 1
A has_password() 0 3 1
A from_umid() 0 11 3
A get_amount_owed() 0 5 1
A all() 0 5 1
A get_shame_users() 0 6 1
A deposit_limit() 0 6 3
A from_uniqname() 0 7 3
A get_users_total() 0 4 1
A random_password() 0 6 2
A check_password() 0 6 2
A get_details() 0 2 1
A from_fuzzy() 0 7 1
A from_id() 0 3 1
A __make_salt() 0 2 1
A count() 0 4 1
A get_amount_held() 0 5 1
1
from pyramid.security import authenticated_userid
2
import hashlib
3
import binascii
4
import os
5
from .model import *
6
from . import account
7
from . import event
8
from chezbetty import utility
9
10
import ldap3
11
import random
12
import string
13
14
class InvalidUserException(Exception):
15
    pass
16
17
18
class LDAPLookup(object):
19
    """class that allows lookup of an individual in the UM directory
20
    based on Michigan ID number, uniqname, MCARD"""
21
22
    SERVER = None
23
    USERNAME = None
24
    BASE_DN = "ou=People,dc=umich,dc=edu"
25
    PASSWORD = None
26
    ATTRIBUTES = ["uid", "entityid", "displayName"]
27
    # http://www.itcs.umich.edu/itcsdocs/r1463/attributes-for-ldap.html
28
    DETAILS_ATTRIBUTES = [
29
            # Could be interesting but require extra perm's from ITS
30
            #"umichInstRoles",
31
            #"umichAlumStatus",
32
            #"umichAAAcadProgram",
33
            #"umichAATermStatus",
34
            #"umichHR",
35
            "notice",
36
            "ou",
37
            "umichDescription",
38
            "umichTitle",
39
            ]
40
41
    def __init__(self):
42
        self.__conn = None
43
44
    def __connect(self):
45
        if not self.__conn:
46
            s = ldap3.Server(self.SERVER, port=636, use_ssl=True, get_info=ldap3.GET_ALL_INFO)
47
            self.__conn = ldap3.Connection(s, auto_bind=True,
48
                    user=self.USERNAME, password=self.PASSWORD,
49
                    client_strategy=ldap3.STRATEGY_SYNC,
50
                    authentication=ldap3.AUTH_SIMPLE
51
            )
52
53
54
    def __do_lookup(self, k, v, attributes, full_dict=False):
55
        self.__connect()
56
        query = "(%s=%s)" % (k, v)
57
        try:
58
            self.__conn.search(self.BASE_DN,
59
                    query,
60
                    ldap3.SEARCH_SCOPE_WHOLE_SUBTREE,
61
                    attributes=attributes
62
            )
63
        except:
64
            # sometimes our connections time out
65
            self.__conn = None
66
            self.__connect()
67
            self.__conn.search(self.BASE_DN,
68
                    query,
69
                    ldap3.SEARCH_SCOPE_WHOLE_SUBTREE,
70
                    attributes=attributes
71
            )
72
73
        if len(self.__conn.response) == 0:
74
            raise InvalidUserException()
75
76
        if full_dict:
77
            return self.__conn.response[0]["attributes"]
78
79
        return {
80
            "umid":self.__conn.response[0]["attributes"]["entityid"],
81
            "uniqname":self.__conn.response[0]["attributes"]["uid"][0],
82
            "name":self.__conn.response[0]["attributes"]["displayName"][0]
83
        }
84
85
    def __lookup(self, k, v):
86
        return self.__do_lookup(k, v, self.ATTRIBUTES)
87
88
    def __detail_lookup(self, k, v):
89
        return self.__do_lookup(k, v,
90
                self.ATTRIBUTES + self.DETAILS_ATTRIBUTES,
91
                full_dict=True,
92
                )
93
94
    def lookup_umid(self, umid, details=False):
95
        if details:
96
            return self.__detail_lookup("entityid", umid)
97
        else:
98
            return self.__lookup("entityid", umid)
99
100
    def lookup_uniqname(self, uniqname, details=False):
101
        if details:
102
            return self.__detail_lookup("uid", uniqname)
103
        else:
104
            return self.__lookup("uid", uniqname)
105
106
107
class User(account.Account):
108
    __tablename__ = 'users'
109
    __mapper_args__ = {'polymorphic_identity': 'user'}
110
111
    id        = Column(Integer, ForeignKey("accounts.id"), primary_key=True)
112
    uniqname  = Column(String(8), nullable=False, unique=True)
113
    umid      = Column(String(8), unique=True)
114
    _password = Column("password", String(255))
115
    _salt     = Column("salt", String(255))
116
    enabled   = Column(Boolean, nullable=False, default=True)
117
    role      = Column(Enum("user", "serviceaccount", "manager", "administrator", name="user_type"),
118
                       nullable=False, default="user")
119
120
    administrative_events = relationship(event.Event, foreign_keys=[event.Event.user_id], backref="admin")
121
    events_deleted        = relationship(event.Event, foreign_keys=[event.Event.deleted_user_id], backref="deleted_user")
122
    __ldap = LDAPLookup()
123
124
    def __init__(self, uniqname, umid, name):
125
        self.enabled = True
126
        self.uniqname = uniqname
127
        self.umid = umid
128
        self.name = name
129
        self.balance = 0.0
130
131
    def __str__(self):
132
        return "<User: id {}, uniqname {}, umid {}, name {}, balance {}>".\
133
                format(self.id, self.uniqname, self.umid, self.name, self.balance)
134
135
    @classmethod
136
    def from_id(cls, id):
137
        return DBSession.query(cls).filter(cls.id == id).one()
138
139
    def get_details(self):
140
        return self.__ldap.lookup_uniqname(self.uniqname, details=True)
141
142
    @classmethod
143
    def from_uniqname(cls, uniqname, local_only=False):
144
        u = DBSession.query(cls).filter(cls.uniqname == uniqname).first()
145
        if not u and not local_only:
146
            u = cls(**cls.__ldap.lookup_uniqname(uniqname))
147
            DBSession.add(u)
148
        return u
149
150
    @classmethod
151
    def from_umid(cls, umid, create_if_never_seen=False):
152
        u = DBSession.query(cls).filter(cls.umid == umid).first()
153
        if not u:
154
            if create_if_never_seen:
155
                u = cls(**cls.__ldap.lookup_umid(umid))
156
                DBSession.add(u)
157
                utility.new_user_email(u)
158
            else:
159
                raise InvalidUserException()
160
        return u
161
162
    @classmethod
163
    def from_fuzzy(cls, search_str):
164
        return DBSession.query(cls)\
165
                        .filter(or_(
166
                            cls.uniqname.ilike('%{}%'.format(search_str)),
167
                            cls.umid.ilike('%{}%'.format(search_str)),
168
                            cls.name.ilike('%{}%'.format(search_str))
169
                        )).all()
170
171
    @classmethod
172
    def all(cls):
173
        return DBSession.query(cls)\
174
                        .filter(cls.enabled)\
175
                        .order_by(cls.name)\
176
                        .all()
177
178
    @classmethod
179
    def count(cls):
180
        return DBSession.query(func.count(cls.id).label('c'))\
181
                        .filter(cls.role != 'serviceaccount')\
182
                        .one().c
183
184
    @classmethod
185
    def get_admins(cls):
186
        return DBSession.query(cls)\
187
                        .filter(cls.enabled)\
188
                        .filter(cls.role=='administrator').all()
189
190
    @classmethod
191
    def get_shame_users(cls):
192
        return DBSession.query(cls)\
193
                        .filter(cls.enabled)\
194
                        .filter(cls.balance < -5)\
195
                        .order_by(cls.balance).all()
196
197
    @classmethod
198
    def get_users_total(cls):
199
        return DBSession.query(func.sum(User.balance).label("total_balance"))\
200
                        .one().total_balance or Decimal(0.0)
201
202
    # Sum the total amount of money in user accounts that we are holding for
203
    # users. This is different from just getting the total because it doesn't
204
    # count users with negative balances
205
    @classmethod
206
    def get_amount_held(cls):
207
        return DBSession.query(func.sum(User.balance).label("total_balance"))\
208
                        .filter(User.balance>0)\
209
                        .one().total_balance or Decimal(0.0)
210
211
    @classmethod
212
    def get_amount_owed(cls):
213
        return DBSession.query(func.sum(User.balance).label("total_balance"))\
214
                        .filter(User.balance<0)\
215
                        .one().total_balance or Decimal(0.0)
216
217
    @classmethod
218
    def get_user_count_cumulative(cls):
219
        rows = DBSession.query(cls.created_at)\
220
                        .order_by(cls.created_at)\
221
                        .all()
222
        return utility.timeseries_cumulative(rows)
223
224
    def __make_salt(self):
225
        return binascii.b2a_base64(open("/dev/urandom", "rb").read(32))[:-3].decode("ascii")
226
227
    @hybrid_property
228
    def password(self):
229
        return self._password
230
231
    @password.setter
232
    def password(self, password):
233
        if password == '':
234
            # Use this to clear the password so the user can't login
235
            self._password = None
236
        else:
237
            self._salt = self.__make_salt()
238
            salted = (self._salt + password).encode('utf-8')
239
            self._password = hashlib.sha256(salted).hexdigest()
240
241
    def random_password(self):
242
        password = ''.join(random.choice(string.ascii_letters + string.digits) for x in range(6))
243
        self._salt = self.__make_salt()
244
        salted = (self._salt + password).encode('utf-8')
245
        self._password = hashlib.sha256(salted).hexdigest()
246
        return password
247
248
    def check_password(self, cand):
249
        if not self._salt:
250
            return False
251
        salted = (self._salt + cand).encode('utf-8')
252
        c = hashlib.sha256(salted).hexdigest()
253
        return c == self._password
254
255
    @property
256
    def has_password(self):
257
        return self._password != None
258
259
    # Cash deposit limit is a quick check on how active the user has been.
260
    # We basically don't want new users to play around with seeing how much
261
    # cash they can deposit.
262
    @property
263
    def deposit_limit(self):
264
        if self.total_deposits > 10.0 and self.total_purchases > 10.0:
265
            return 100.0
266
        else:
267
            return 20.0
268
269
270
def get_user(request):
271
    login = authenticated_userid(request)
272
    if not login:
273
        return None
274
    return DBSession.query(User).filter(User.uniqname == login).one()
275
276
277
# This is in a stupid place due to circular input problems
278
@property
279
def __user_from_foreign_key(self):
280
    return User.from_id(self.user_id)
281
event.Event.user = __user_from_foreign_key
282
283
284
def groupfinder(userid, request):
285
    user = User.from_uniqname(userid)
286
    if user.role == "user":
287
        return ["user",]
288
    elif user.role == "manager":
289
        return ["user","manager"]
290
    elif user.role == "administrator":
291
        return ["user","manager","admin","serviceaccount"]
292
    elif user.role == "serviceaccount":
293
        return ["serviceaccount"]
294