Completed
Push — master ( ab9b19...2e7413 )
by Pat
01:15
created

User.iterate_recent_items()   C

Complexity

Conditions 10

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
dl 0
loc 14
rs 6
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like User.iterate_recent_items() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from pyramid.security import authenticated_userid
2
import hashlib
3
import binascii
4
import os
5
from .model import *
6
from . import account
7
from . import event
8
from . import request
9
from . import request_post
10
from chezbetty import utility
11
12
import ldap3
13
import random
14
import string
15
16
class InvalidUserException(Exception):
17
    pass
18
19
20
class LDAPLookup(object):
21
    """class that allows lookup of an individual in the UM directory
22
    based on Michigan ID number, uniqname, MCARD"""
23
24
    SERVER = None
25
    USERNAME = None
26
    BASE_DN = "ou=People,dc=umich,dc=edu"
27
    PASSWORD = None
28
    ATTRIBUTES = ["uid", "entityid", "displayName"]
29
    # http://www.itcs.umich.edu/itcsdocs/r1463/attributes-for-ldap.html
30
    DETAILS_ATTRIBUTES = [
31
            # Could be interesting but require extra perm's from ITS
32
            #"umichInstRoles",
33
            #"umichAlumStatus",
34
            #"umichAAAcadProgram",
35
            #"umichAATermStatus",
36
            #"umichHR",
37
            "notice",
38
            "ou",
39
            "umichDescription",
40
            "umichTitle",
41
            ]
42
43
    def __init__(self):
44
        self.__conn = None
45
46
    def __connect(self):
47
        if not self.__conn:
48
            s = ldap3.Server(self.SERVER, port=636, use_ssl=True, get_info=ldap3.GET_ALL_INFO)
49
            self.__conn = ldap3.Connection(s, auto_bind=True,
50
                    user=self.USERNAME, password=self.PASSWORD,
51
                    client_strategy=ldap3.STRATEGY_SYNC,
52
                    authentication=ldap3.AUTH_SIMPLE
53
            )
54
55
56
    def __do_lookup(self, k, v, attributes, full_dict=False):
57
        self.__connect()
58
        query = "(%s=%s)" % (k, v)
59
        try:
60
            self.__conn.search(self.BASE_DN,
61
                    query,
62
                    ldap3.SEARCH_SCOPE_WHOLE_SUBTREE,
63
                    attributes=attributes
64
            )
65
        except:
66
            # sometimes our connections time out
67
            self.__conn = None
68
            self.__connect()
69
            self.__conn.search(self.BASE_DN,
70
                    query,
71
                    ldap3.SEARCH_SCOPE_WHOLE_SUBTREE,
72
                    attributes=attributes
73
            )
74
75
        if len(self.__conn.response) == 0:
76
            raise InvalidUserException()
77
78
        if full_dict:
79
            return self.__conn.response[0]["attributes"]
80
81
        return {
82
            "umid":self.__conn.response[0]["attributes"]["entityid"],
83
            "uniqname":self.__conn.response[0]["attributes"]["uid"][0],
84
            "name":self.__conn.response[0]["attributes"]["displayName"][0]
85
        }
86
87
    def __lookup(self, k, v):
88
        return self.__do_lookup(k, v, self.ATTRIBUTES)
89
90
    def __detail_lookup(self, k, v):
91
        return self.__do_lookup(k, v,
92
                self.ATTRIBUTES + self.DETAILS_ATTRIBUTES,
93
                full_dict=True,
94
                )
95
96
    def lookup_umid(self, umid, details=False):
97
        if details:
98
            return self.__detail_lookup("entityid", umid)
99
        else:
100
            return self.__lookup("entityid", umid)
101
102
    def lookup_uniqname(self, uniqname, details=False):
103
        if details:
104
            return self.__detail_lookup("uid", uniqname)
105
        else:
106
            return self.__lookup("uid", uniqname)
107
108
109
class User(account.Account):
110
    __tablename__ = 'users'
111
    __mapper_args__ = {'polymorphic_identity': 'user'}
112
113
    id        = Column(Integer, ForeignKey("accounts.id"), primary_key=True)
114
    uniqname  = Column(String(8), nullable=False, unique=True)
115
    umid      = Column(String(8), unique=True)
116
    _password = Column("password", String(255))
117
    _salt     = Column("salt", String(255))
118
    enabled   = Column(Boolean, nullable=False, default=True)
119
    archived  = Column(Boolean, nullable=False, default=False)
120
    role      = Column(Enum("user", "serviceaccount", "manager", "administrator", name="user_type"),
121
                       nullable=False, default="user")
122
123
    administrative_events = relationship(event.Event, foreign_keys=[event.Event.user_id], backref="admin")
124
    events_deleted        = relationship(event.Event, foreign_keys=[event.Event.deleted_user_id], backref="deleted_user")
125
    requests              = relationship(request.Request, foreign_keys=[request.Request.user_id], backref="user")
126
    request_posts         = relationship(
127
                              request_post.RequestPost,
128
                              foreign_keys=[request_post.RequestPost.user_id],
129
                              backref="user",
130
                            )
131
132
    __ldap = LDAPLookup()
133
134
    def __init__(self, uniqname, umid, name):
135
        self.enabled = True
136
        self.uniqname = uniqname
137
        self.umid = umid
138
        self.name = name
139
        self.balance = 0.0
140
141
    def __str__(self):
142
        return "<User: id {}, uniqname {}, umid {}, name {}, balance {}>".\
143
                format(self.id, self.uniqname, self.umid, self.name, self.balance)
144
145
    @classmethod
146
    def from_id(cls, id):
147
        return DBSession.query(cls).filter(cls.id == id).one()
148
149
    def get_details(self):
150
        return self.__ldap.lookup_uniqname(self.uniqname, details=True)
151
152
    @classmethod
153
    def from_uniqname(cls, uniqname, local_only=False):
154
        u = DBSession.query(cls).filter(cls.uniqname == uniqname).first()
155
        if not u and not local_only:
156
            u = cls(**cls.__ldap.lookup_uniqname(uniqname))
157
            DBSession.add(u)
158
        return u
159
160
    @classmethod
161
    def from_umid(cls, umid, create_if_never_seen=False):
162
        u = DBSession.query(cls).filter(cls.umid == umid).first()
163
        if not u:
164
            if create_if_never_seen:
165
                u = cls(**cls.__ldap.lookup_umid(umid))
166
                DBSession.add(u)
167
                utility.new_user_email(u)
168
            else:
169
                raise InvalidUserException()
170
        return u
171
172
    @classmethod
173
    def from_fuzzy(cls, search_str, any=True):
174
        q = DBSession.query(cls)\
175
                     .filter(or_(
176
                            cls.uniqname.ilike('%{}%'.format(search_str)),
177
                            cls.umid.ilike('%{}%'.format(search_str)),
178
                            cls.name.ilike('%{}%'.format(search_str))
179
                      ))
180
        if not any:
181
            q = q.filter(cls.enabled)\
182
                 .filter(cls.archived==False)
183
184
        return q.all()
185
186
    @classmethod
187
    def all(cls):
188
        return DBSession.query(cls)\
189
                        .filter(cls.enabled)\
190
                        .order_by(cls.name)\
191
                        .all()
192
193
    @classmethod
194
    def count(cls):
195
        return DBSession.query(func.count(cls.id).label('c'))\
196
                        .filter(cls.role != 'serviceaccount')\
197
                        .filter(cls.archived == False)\
198
                        .filter(cls.enabled == True)\
199
                        .one().c
200
201
    @classmethod
202
    def get_admins(cls):
203
        return DBSession.query(cls)\
204
                        .filter(cls.enabled)\
205
                        .filter(cls.role=='administrator').all()
206
207
    @classmethod
208
    def get_shame_users(cls):
209
        return DBSession.query(cls)\
210
                        .filter(cls.enabled)\
211
                        .filter(cls.balance < -5)\
212
                        .order_by(cls.balance).all()
213
214
    @classmethod
215
    def get_normal_users(cls):
216
        return DBSession.query(cls)\
217
                        .filter(cls.enabled)\
218
                        .filter(cls.archived == False)\
219
                        .order_by(cls.name).all()
220
221
    @classmethod
222
    def get_archived_users(cls):
223
        return DBSession.query(cls)\
224
                        .filter(cls.enabled)\
225
                        .filter(cls.archived == True)\
226
                        .order_by(cls.name).all()
227
228
    @classmethod
229
    def get_disabled_users(cls):
230
        return DBSession.query(cls)\
231
                        .filter(cls.enabled == False)\
232
                        .order_by(cls.name).all()
233
234
    @classmethod
235
    def get_users_total(cls):
236
        return DBSession.query(func.sum(User.balance).label("total_balance"))\
237
                        .one().total_balance or Decimal(0.0)
238
239
    # Sum the total amount of money in user accounts that we are holding for
240
    # users. This is different from just getting the total because it doesn't
241
    # count users with negative balances
242
    @classmethod
243
    def get_amount_held(cls):
244
        return DBSession.query(func.sum(User.balance).label("total_balance"))\
245
                        .filter(User.balance>0)\
246
                        .one().total_balance or Decimal(0.0)
247
248
    @classmethod
249
    def get_amount_owed(cls):
250
        return DBSession.query(func.sum(User.balance).label("total_balance"))\
251
                        .filter(User.balance<0)\
252
                        .one().total_balance or Decimal(0.0)
253
254
    @classmethod
255
    def get_user_count_cumulative(cls):
256
        rows = DBSession.query(cls.created_at)\
257
                        .order_by(cls.created_at)\
258
                        .all()
259
        return utility.timeseries_cumulative(rows)
260
261
    def iterate_recent_items(self, limit=None, allow_duplicates=False):
262
        items = set()
263
        count = 0
264
        for e in self.events:
265
            if e.type == 'purchase':
266
                for transaction in e.transactions:
267
                    if transaction.type == 'purchase':
268
                        for line_item in transaction.subtransactions:
269
                            if (line_item.item not in items) or allow_duplicates:
270
                                count += 1
271
                                if limit is not None and count > limit:
272
                                    return
273
                                yield line_item.item
274
                                items.add(line_item.item)
275
276
    def __make_salt(self):
277
        return binascii.b2a_base64(open("/dev/urandom", "rb").read(32))[:-3].decode("ascii")
278
279
    @hybrid_property
280
    def password(self):
281
        return self._password
282
283
    @password.setter
284
    def password(self, password):
285
        if password == '':
286
            # Use this to clear the password so the user can't login
287
            self._password = None
288
        else:
289
            self._salt = self.__make_salt()
290
            salted = (self._salt + password).encode('utf-8')
291
            self._password = hashlib.sha256(salted).hexdigest()
292
293
    def random_password(self):
294
        password = ''.join(random.choice(string.ascii_letters + string.digits) for x in range(6))
295
        self._salt = self.__make_salt()
296
        salted = (self._salt + password).encode('utf-8')
297
        self._password = hashlib.sha256(salted).hexdigest()
298
        return password
299
300
    def check_password(self, cand):
301
        if not self._salt:
302
            return False
303
        salted = (self._salt + cand).encode('utf-8')
304
        c = hashlib.sha256(salted).hexdigest()
305
        return c == self._password
306
307
    @property
308
    def has_password(self):
309
        return self._password != None
310
311
    # Cash deposit limit is now fixed at $2 because we have a bill acceptor
312
    @property
313
    def deposit_limit(self):
314
        return 2.0
315
316
317
def get_user(request):
318
    login = authenticated_userid(request)
319
    if not login:
320
        return None
321
    return DBSession.query(User).filter(User.uniqname == login).one()
322
323
324
# This is in a stupid place due to circular input problems
325
@property
326
def __user_from_foreign_key(self):
327
    return User.from_id(self.user_id)
328
event.Event.user = __user_from_foreign_key
329
330
331
def groupfinder(userid, request):
332
    user = User.from_uniqname(userid)
333
    if user.role == "user":
334
        return ["user",]
335
    elif user.role == "manager":
336
        return ["user","manager"]
337
    elif user.role == "administrator":
338
        return ["user","manager","admin","serviceaccount"]
339
    elif user.role == "serviceaccount":
340
        return ["serviceaccount"]
341