Completed
Push — master ( d2aebd...b0ae7e )
by
unknown
01:01
created

chezbetty.models.User   A

Complexity

Total Complexity 34

Size/Duplication

Total Lines 184
Duplicated Lines 0 %
Metric Value
dl 0
loc 184
rs 9.2
wmc 34

24 Methods

Rating   Name   Duplication   Size   Complexity  
A User.from_uniqname() 0 7 3
A User.all() 0 5 1
A User.count() 0 6 1
A User.get_archived_users() 0 6 1
A User.__make_salt() 0 2 1
A User.from_id() 0 3 1
A User.check_password() 0 6 2
A User.get_details() 0 2 1
A User.get_shame_users() 0 6 1
A User.random_password() 0 6 2
A User.password() 0 3 2
A User.get_normal_users() 0 6 1
A User.__str__() 0 3 1
A User.has_password() 0 3 1
A User.get_user_count_cumulative() 0 6 1
A User.__init__() 0 6 1
A User.get_users_total() 0 4 1
A User.from_fuzzy() 0 7 1
A User.get_amount_owed() 0 5 1
A User.get_disabled_users() 0 5 1
A User.get_admins() 0 5 1
A User.get_amount_held() 0 5 1
A User.deposit_limit() 0 6 3
A User.from_umid() 0 11 3
1
from pyramid.security import authenticated_userid
2
import hashlib
3
import binascii
4
import os
5
from .model import *
6
from . import account
7
from . import event
8
from chezbetty import utility
9
10
import ldap3
11
import random
12
import string
13
14
class InvalidUserException(Exception):
15
    pass
16
17
18
class LDAPLookup(object):
19
    """class that allows lookup of an individual in the UM directory
20
    based on Michigan ID number, uniqname, MCARD"""
21
22
    SERVER = None
23
    USERNAME = None
24
    BASE_DN = "ou=People,dc=umich,dc=edu"
25
    PASSWORD = None
26
    ATTRIBUTES = ["uid", "entityid", "displayName"]
27
    # http://www.itcs.umich.edu/itcsdocs/r1463/attributes-for-ldap.html
28
    DETAILS_ATTRIBUTES = [
29
            # Could be interesting but require extra perm's from ITS
30
            #"umichInstRoles",
31
            #"umichAlumStatus",
32
            #"umichAAAcadProgram",
33
            #"umichAATermStatus",
34
            #"umichHR",
35
            "notice",
36
            "ou",
37
            "umichDescription",
38
            "umichTitle",
39
            ]
40
41
    def __init__(self):
42
        self.__conn = None
43
44
    def __connect(self):
45
        if not self.__conn:
46
            s = ldap3.Server(self.SERVER, port=636, use_ssl=True, get_info=ldap3.GET_ALL_INFO)
47
            self.__conn = ldap3.Connection(s, auto_bind=True,
48
                    user=self.USERNAME, password=self.PASSWORD,
49
                    client_strategy=ldap3.STRATEGY_SYNC,
50
                    authentication=ldap3.AUTH_SIMPLE
51
            )
52
53
54
    def __do_lookup(self, k, v, attributes, full_dict=False):
55
        self.__connect()
56
        query = "(%s=%s)" % (k, v)
57
        try:
58
            self.__conn.search(self.BASE_DN,
59
                    query,
60
                    ldap3.SEARCH_SCOPE_WHOLE_SUBTREE,
61
                    attributes=attributes
62
            )
63
        except:
64
            # sometimes our connections time out
65
            self.__conn = None
66
            self.__connect()
67
            self.__conn.search(self.BASE_DN,
68
                    query,
69
                    ldap3.SEARCH_SCOPE_WHOLE_SUBTREE,
70
                    attributes=attributes
71
            )
72
73
        if len(self.__conn.response) == 0:
74
            raise InvalidUserException()
75
76
        if full_dict:
77
            return self.__conn.response[0]["attributes"]
78
79
        return {
80
            "umid":self.__conn.response[0]["attributes"]["entityid"],
81
            "uniqname":self.__conn.response[0]["attributes"]["uid"][0],
82
            "name":self.__conn.response[0]["attributes"]["displayName"][0]
83
        }
84
85
    def __lookup(self, k, v):
86
        return self.__do_lookup(k, v, self.ATTRIBUTES)
87
88
    def __detail_lookup(self, k, v):
89
        return self.__do_lookup(k, v,
90
                self.ATTRIBUTES + self.DETAILS_ATTRIBUTES,
91
                full_dict=True,
92
                )
93
94
    def lookup_umid(self, umid, details=False):
95
        if details:
96
            return self.__detail_lookup("entityid", umid)
97
        else:
98
            return self.__lookup("entityid", umid)
99
100
    def lookup_uniqname(self, uniqname, details=False):
101
        if details:
102
            return self.__detail_lookup("uid", uniqname)
103
        else:
104
            return self.__lookup("uid", uniqname)
105
106
107
class User(account.Account):
108
    __tablename__ = 'users'
109
    __mapper_args__ = {'polymorphic_identity': 'user'}
110
111
    id        = Column(Integer, ForeignKey("accounts.id"), primary_key=True)
112
    uniqname  = Column(String(8), nullable=False, unique=True)
113
    umid      = Column(String(8), unique=True)
114
    _password = Column("password", String(255))
115
    _salt     = Column("salt", String(255))
116
    enabled   = Column(Boolean, nullable=False, default=True)
117
    archived  = Column(Boolean, nullable=False, default=False)
118
    role      = Column(Enum("user", "serviceaccount", "manager", "administrator", name="user_type"),
119
                       nullable=False, default="user")
120
121
    administrative_events = relationship(event.Event, foreign_keys=[event.Event.user_id], backref="admin")
122
    events_deleted        = relationship(event.Event, foreign_keys=[event.Event.deleted_user_id], backref="deleted_user")
123
    __ldap = LDAPLookup()
124
125
    def __init__(self, uniqname, umid, name):
126
        self.enabled = True
127
        self.uniqname = uniqname
128
        self.umid = umid
129
        self.name = name
130
        self.balance = 0.0
131
132
    def __str__(self):
133
        return "<User: id {}, uniqname {}, umid {}, name {}, balance {}>".\
134
                format(self.id, self.uniqname, self.umid, self.name, self.balance)
135
136
    @classmethod
137
    def from_id(cls, id):
138
        return DBSession.query(cls).filter(cls.id == id).one()
139
140
    def get_details(self):
141
        return self.__ldap.lookup_uniqname(self.uniqname, details=True)
142
143
    @classmethod
144
    def from_uniqname(cls, uniqname, local_only=False):
145
        u = DBSession.query(cls).filter(cls.uniqname == uniqname).first()
146
        if not u and not local_only:
147
            u = cls(**cls.__ldap.lookup_uniqname(uniqname))
148
            DBSession.add(u)
149
        return u
150
151
    @classmethod
152
    def from_umid(cls, umid, create_if_never_seen=False):
153
        u = DBSession.query(cls).filter(cls.umid == umid).first()
154
        if not u:
155
            if create_if_never_seen:
156
                u = cls(**cls.__ldap.lookup_umid(umid))
157
                DBSession.add(u)
158
                utility.new_user_email(u)
159
            else:
160
                raise InvalidUserException()
161
        return u
162
163
    @classmethod
164
    def from_fuzzy(cls, search_str):
165
        return DBSession.query(cls)\
166
                        .filter(or_(
167
                            cls.uniqname.ilike('%{}%'.format(search_str)),
168
                            cls.umid.ilike('%{}%'.format(search_str)),
169
                            cls.name.ilike('%{}%'.format(search_str))
170
                        )).all()
171
172
    @classmethod
173
    def all(cls):
174
        return DBSession.query(cls)\
175
                        .filter(cls.enabled)\
176
                        .order_by(cls.name)\
177
                        .all()
178
179
    @classmethod
180
    def count(cls):
181
        return DBSession.query(func.count(cls.id).label('c'))\
182
                        .filter(cls.role != 'serviceaccount')\
183
                        .filter(cls.archived == False)\
184
                        .filter(cls.enabled == True)\
185
                        .one().c
186
187
    @classmethod
188
    def get_admins(cls):
189
        return DBSession.query(cls)\
190
                        .filter(cls.enabled)\
191
                        .filter(cls.role=='administrator').all()
192
193
    @classmethod
194
    def get_shame_users(cls):
195
        return DBSession.query(cls)\
196
                        .filter(cls.enabled)\
197
                        .filter(cls.balance < -5)\
198
                        .order_by(cls.balance).all()
199
200
    @classmethod
201
    def get_normal_users(cls):
202
        return DBSession.query(cls)\
203
                        .filter(cls.enabled)\
204
                        .filter(cls.archived == False)\
205
                        .order_by(cls.name).all()
206
207
    @classmethod
208
    def get_archived_users(cls):
209
        return DBSession.query(cls)\
210
                        .filter(cls.enabled)\
211
                        .filter(cls.archived == True)\
212
                        .order_by(cls.name).all()
213
214
    @classmethod
215
    def get_disabled_users(cls):
216
        return DBSession.query(cls)\
217
                        .filter(cls.enabled == False)\
218
                        .order_by(cls.name).all()
219
220
    @classmethod
221
    def get_users_total(cls):
222
        return DBSession.query(func.sum(User.balance).label("total_balance"))\
223
                        .one().total_balance or Decimal(0.0)
224
225
    # Sum the total amount of money in user accounts that we are holding for
226
    # users. This is different from just getting the total because it doesn't
227
    # count users with negative balances
228
    @classmethod
229
    def get_amount_held(cls):
230
        return DBSession.query(func.sum(User.balance).label("total_balance"))\
231
                        .filter(User.balance>0)\
232
                        .one().total_balance or Decimal(0.0)
233
234
    @classmethod
235
    def get_amount_owed(cls):
236
        return DBSession.query(func.sum(User.balance).label("total_balance"))\
237
                        .filter(User.balance<0)\
238
                        .one().total_balance or Decimal(0.0)
239
240
    @classmethod
241
    def get_user_count_cumulative(cls):
242
        rows = DBSession.query(cls.created_at)\
243
                        .order_by(cls.created_at)\
244
                        .all()
245
        return utility.timeseries_cumulative(rows)
246
247
    def __make_salt(self):
248
        return binascii.b2a_base64(open("/dev/urandom", "rb").read(32))[:-3].decode("ascii")
249
250
    @hybrid_property
251
    def password(self):
252
        return self._password
253
254
    @password.setter
255
    def password(self, password):
256
        if password == '':
257
            # Use this to clear the password so the user can't login
258
            self._password = None
259
        else:
260
            self._salt = self.__make_salt()
261
            salted = (self._salt + password).encode('utf-8')
262
            self._password = hashlib.sha256(salted).hexdigest()
263
264
    def random_password(self):
265
        password = ''.join(random.choice(string.ascii_letters + string.digits) for x in range(6))
266
        self._salt = self.__make_salt()
267
        salted = (self._salt + password).encode('utf-8')
268
        self._password = hashlib.sha256(salted).hexdigest()
269
        return password
270
271
    def check_password(self, cand):
272
        if not self._salt:
273
            return False
274
        salted = (self._salt + cand).encode('utf-8')
275
        c = hashlib.sha256(salted).hexdigest()
276
        return c == self._password
277
278
    @property
279
    def has_password(self):
280
        return self._password != None
281
282
    # Cash deposit limit is a quick check on how active the user has been.
283
    # We basically don't want new users to play around with seeing how much
284
    # cash they can deposit.
285
    @property
286
    def deposit_limit(self):
287
        if self.total_deposits > 10.0 and self.total_purchases > 10.0:
288
            return 100.0
289
        else:
290
            return 20.0
291
292
293
def get_user(request):
294
    login = authenticated_userid(request)
295
    if not login:
296
        return None
297
    return DBSession.query(User).filter(User.uniqname == login).one()
298
299
300
# This is in a stupid place due to circular input problems
301
@property
302
def __user_from_foreign_key(self):
303
    return User.from_id(self.user_id)
304
event.Event.user = __user_from_foreign_key
305
306
307
def groupfinder(userid, request):
308
    user = User.from_uniqname(userid)
309
    if user.role == "user":
310
        return ["user",]
311
    elif user.role == "manager":
312
        return ["user","manager"]
313
    elif user.role == "administrator":
314
        return ["user","manager","admin","serviceaccount"]
315
    elif user.role == "serviceaccount":
316
        return ["serviceaccount"]
317