Passed
Push — master ( ac4bf6...54b718 )
by William
01:41 queued 11s
created

app.auth.global_admin_required()   A

Complexity

Conditions 3

Size

Total Lines 15
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 10
nop 1
dl 0
loc 15
rs 9.9
c 0
b 0
f 0
1
from flask import Blueprint, render_template, redirect, url_for, request, flash, session
2
from flask_login import login_user, login_required, logout_user, current_user
3
from werkzeug.security import generate_password_hash, check_password_hash
4
from .models import User
5
from app import db
6
import pandas as pd
7
import os
8
from functools import wraps
9
from werkzeug.exceptions import Unauthorized
10
from corbado_python_sdk import Config, CorbadoSDK, UserEntity
11
12
13
auth = Blueprint('auth', __name__)
14
15
16
short_session_cookie_name = "cbo_short_session"
17
# Config has a default values for 'short_session_cookie_name' and 'BACKEND_API'
18
config: Config = Config(
19
    api_secret=os.environ['API_SECRET'],
20
    project_id=os.environ['PROJECT_ID'],
21
    frontend_api=os.environ['FRONTEND_URI'],
22
    backend_api="https://backendapi.cloud.corbado.io",
23
)
24
config.frontend_api = os.environ['FRONTEND_URI']
25
26
# Initialize SDK
27
sdk: CorbadoSDK = CorbadoSDK(config=config)
28
29
30
@auth.route('/login')
31
def login():
32
    return render_template('login.html')
33
34
35
@auth.route('/login', methods=['POST'])
36
def login_post():
37
    # login code goes here
38
    email = request.form.get('email')
39
    password = request.form.get('password')
40
    remember = True if request.form.get('remember') else False
41
42
    user = User.query.filter_by(email=email).first()
43
44
    # check if the user actually exists
45
    # take the user-supplied password, hash it, and compare it to the hashed password in the database
46
    if not user or not check_password_hash(user.password, password):
47
        flash('Please check your login details and try again.')
48
        return redirect(url_for('auth.login')) # if the user doesn't exist or password is wrong, reload the page
49
50
    # fix for no admin user to make current user an admin
51
    user_test = User.query.filter_by(admin=True).first()
52
    if not user_test:
53
        user.admin = 1
54
        db.session.commit()
55
56
    # ensure there's at least one global admin in the system
57
    global_admin_test = User.query.filter_by(is_global_admin=True).first()
58
    if not global_admin_test:
59
        # Set the first admin user to be a global admin
60
        first_admin = User.query.filter_by(admin=True).order_by(User.id).first()
61
        if first_admin:
62
            first_admin.is_global_admin = True
63
            db.session.commit()
64
65
    # IMPORTANT: Global admins are always active - auto-activate if needed
66
    if user.is_global_admin and not user.is_active:
67
        user.is_active = True
68
        db.session.commit()
69
70
    # check if the user account is active (after global admin auto-activation)
71
    if not user.is_active:
72
        flash('Your account is pending approval. Please contact an administrator.')
73
        return redirect(url_for('auth.login'))
74
75
    # if the above check passes, then we know the user has the right credentials
76
    login_user(user, remember=remember)
77
    session['name'] = user.name
78
    session['email'] = user.email
79
80
    return redirect(url_for('main.index'))
81
82
83
@auth.route('/signup')
84
def signup():
85
    try:
86
        engine = db.create_engine(os.environ.get('DATABASE_URL')).connect()
87
    except:
88
        engine = db.create_engine('sqlite:///db.sqlite').connect()
89
90
    try:
91
        df = pd.read_sql('SELECT * FROM settings;', engine)
92
93
        if df['value'][0] == 1:
94
            return render_template('login.html')
95
    except:
96
        pass
97
98
    return render_template('signup.html')
99
100
101
@auth.route('/signup', methods=['POST'])
102
def signup_post():
103
    # code to validate and add user to database goes here
104
    email = request.form.get('email')
105
    name = request.form.get('name')
106
    password = request.form.get('password')
107
108
    user = User.query.filter_by(email=email).first() # if this returns a user, then the email already exists in database
109
110
    if user: # if a user is found, we want to redirect back to signup page so user can try again
111
        flash('Email address already exists')
112
        return redirect(url_for('auth.signup'))
113
114
    # if no admin user, make new user an admin AND global admin AND active
115
    user_test = User.query.filter_by(admin=True).first()
116
    if not user_test:
117
        admin = True
118
        is_global_admin = True
119
        is_active = True
120
    else:
121
        # New signups are admins but inactive until approved by global admin
122
        admin = True
123
        is_global_admin = False
124
        is_active = False
125
126
    # create a new user with the form data. Hash the password so the plaintext version isn't saved.
127
    new_user = User(
128
        email=email,
129
        name=name,
130
        password=generate_password_hash(password, method='scrypt'),
131
        admin=admin,
132
        is_global_admin=is_global_admin,
133
        is_active=is_active
134
    )
135
136
    # add the new user to the database
137
    db.session.add(new_user)
138
    db.session.commit()
139
140
    return redirect(url_for('auth.login'))
141
142
143
@auth.route('/logout')
144
@login_required
145
def logout():
146
    logout_user()
147
    return redirect(url_for('main.index'))
148
149
150
def admin_required(f):
151
    @wraps(f)
152
    def decorated_function(*args, **kwargs):
153
        if current_user.admin:
154
            return f(*args, **kwargs)
155
        else:
156
            return redirect(url_for('main.index'))
157
    return decorated_function
158
159
160
def global_admin_required(f):
161
    """
162
    Decorator for routes that require global admin access.
163
    Only users with is_global_admin=True can access.
164
    """
165
    @wraps(f)
166
    def decorated_function(*args, **kwargs):
167
        if not current_user.is_authenticated:
168
            return redirect(url_for('auth.login'))
169
        if current_user.is_global_admin:
170
            return f(*args, **kwargs)
171
        else:
172
            flash('Global admin access required')
173
            return redirect(url_for('main.index'))
174
    return decorated_function
175
176
177
def account_owner_required(f):
178
    """
179
    Decorator for routes that require account owner access.
180
    Guest users (those with account_owner_id set) cannot access.
181
    """
182
    @wraps(f)
183
    def decorated_function(*args, **kwargs):
184
        if not current_user.is_authenticated:
185
            return redirect(url_for('auth.login'))
186
        if current_user.account_owner_id is not None:
187
            flash('Account owner access required')
188
            return redirect(url_for('main.index'))
189
        return f(*args, **kwargs)
190
    return decorated_function
191
192
193
@auth.route('/passkey_login')
194
def login_passkey():
195
    project_id = os.environ['PROJECT_ID']
196
    frontend_uri = os.environ['FRONTEND_URI']
197
198
    return render_template('passkey_login.html', project_id=project_id, frontend_uri=frontend_uri)
199
200
201
@auth.route('/passkey_login_post')
202
def login_passkey_post():
203
204
    auth_user = get_authenticated_user_from_cookie()
205
    if auth_user:
206
        email_identifiers = sdk.identifiers.list_all_emails_by_user_id(user_id=auth_user.user_id)
207
        email = email_identifiers[0].value
208
    else:
209
        # use more sophisticated error handling in production
210
        raise Unauthorized()
211
212
    user = User.query.filter_by(email=email).first()
213
214
    # check if the user actually exists
215
    # take the user-supplied password, hash it, and compare it to the hashed password in the database
216
    if not user:
217
        flash('Please check your login details and try again.')
218
        return redirect(url_for('auth.login'))  # if the user doesn't exist or password is wrong, reload the page
219
220
    # fix for no admin user to make current user an admin
221
    user_test = User.query.filter_by(admin=True).first()
222
    if not user_test:
223
        user.admin = 1
224
        db.session.commit()
225
226
    # ensure there's at least one global admin in the system
227
    global_admin_test = User.query.filter_by(is_global_admin=True).first()
228
    if not global_admin_test:
229
        # Set the first admin user to be a global admin
230
        first_admin = User.query.filter_by(admin=True).order_by(User.id).first()
231
        if first_admin:
232
            first_admin.is_global_admin = True
233
            db.session.commit()
234
235
    # if the above check passes, then we know the user has the right credentials
236
    login_user(user, remember=True)
237
    session['name'] = user.name
238
    session['email'] = user.email
239
240
    return redirect(url_for('main.index'))
241
242
243
def get_authenticated_user_from_cookie() -> UserEntity | None:
244
    session_token = request.cookies.get('cbo_session_token')
245
    if not session_token:
246
        return None
247
    try:
248
        return sdk.sessions.validate_token(session_token)
249
    except:
250
        raise Unauthorized()