Passed
Push — master ( cd6825...32c582 )
by William
02:58 queued 01:23
created

app.auth.signup_post()   B

Complexity

Conditions 5

Size

Total Lines 49
Code Lines 32

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 32
nop 0
dl 0
loc 49
rs 8.6453
c 0
b 0
f 0
1
from flask import Blueprint, render_template, redirect, url_for, request, flash, session
2
from flask_login import login_user, login_required, logout_user, current_user
3
from werkzeug.security import generate_password_hash, check_password_hash
4
from .models import User
5
from app import db
6
from .getemail import send_new_user_notification
7
import pandas as pd
8
import os
9
from functools import wraps
10
from werkzeug.exceptions import Unauthorized
11
from corbado_python_sdk import Config, CorbadoSDK, UserEntity
12
13
14
auth = Blueprint('auth', __name__)
15
16
17
short_session_cookie_name = "cbo_short_session"
18
19
# Read environment variables safely (returns None if missing)
20
API_SECRET = os.getenv("API_SECRET")
21
PROJECT_ID = os.getenv("PROJECT_ID")
22
FRONTEND_URI = os.getenv("FRONTEND_URI")
23
24
corbado_config = None
25
corbado_enabled = all([API_SECRET, PROJECT_ID, FRONTEND_URI])
26
27
# Config has a default values for 'short_session_cookie_name' and 'BACKEND_API'
28
if corbado_enabled:
29
    config: Config = Config(
30
        api_secret=os.environ['API_SECRET'],
31
        project_id=os.environ['PROJECT_ID'],
32
        frontend_api=os.environ['FRONTEND_URI'],
33
        backend_api="https://backendapi.cloud.corbado.io",
34
    )
35
    config.frontend_api = os.environ['FRONTEND_URI']
36
37
    # Initialize SDK
38
    sdk: CorbadoSDK = CorbadoSDK(config=config)
39
40
41
@auth.route('/login')
42
def login():
43
    return render_template('login.html')
44
45
46
@auth.route('/login', methods=['POST'])
47
def login_post():
48
    # login code goes here
49
    email = request.form.get('email')
50
    password = request.form.get('password')
51
    remember = True if request.form.get('remember') else False
52
53
    user = User.query.filter_by(email=email).first()
54
55
    # check if the user actually exists
56
    # take the user-supplied password, hash it, and compare it to the hashed password in the database
57
    if not user or not check_password_hash(user.password, password):
58
        flash('Please check your login details and try again.')
59
        return redirect(url_for('auth.login')) # if the user doesn't exist or password is wrong, reload the page
60
61
    # fix for no admin user to make current user an admin
62
    user_test = User.query.filter_by(admin=True).first()
63
    if not user_test:
64
        user.admin = 1
65
        db.session.commit()
66
67
    # ensure there's at least one global admin in the system
68
    global_admin_test = User.query.filter_by(is_global_admin=True).first()
69
    if not global_admin_test:
70
        # Set the first admin user to be a global admin
71
        first_admin = User.query.filter_by(admin=True).order_by(User.id).first()
72
        if first_admin:
73
            first_admin.is_global_admin = True
74
            db.session.commit()
75
76
    # IMPORTANT: Global admins are always active - auto-activate if needed
77
    if user.is_global_admin and not user.is_active:
78
        user.is_active = True
79
        db.session.commit()
80
81
    # check if the user account is active (after global admin auto-activation)
82
    if not user.is_active:
83
        flash('Your account is pending approval. Please contact an administrator.')
84
        return redirect(url_for('auth.login'))
85
86
    # if the above check passes, then we know the user has the right credentials
87
    login_user(user, remember=remember)
88
    session['name'] = user.name
89
    session['email'] = user.email
90
91
    return redirect(url_for('main.index'))
92
93
94
@auth.route('/signup')
95
def signup():
96
    try:
97
        engine = db.create_engine(os.environ.get('DATABASE_URL')).connect()
98
    except:
99
        engine = db.create_engine('sqlite:///db.sqlite').connect()
100
101
    try:
102
        df = pd.read_sql('SELECT * FROM settings;', engine)
103
104
        if df['value'][0] == 1:
105
            return render_template('login.html')
106
    except:
107
        pass
108
109
    return render_template('signup.html')
110
111
112
@auth.route('/signup', methods=['POST'])
113
def signup_post():
114
    # code to validate and add user to database goes here
115
    email = request.form.get('email')
116
    name = request.form.get('name')
117
    password = request.form.get('password')
118
119
    user = User.query.filter_by(email=email).first() # if this returns a user, then the email already exists in database
120
121
    if user: # if a user is found, we want to redirect back to signup page so user can try again
122
        flash('Email address already exists')
123
        return redirect(url_for('auth.signup'))
124
125
    # if no admin user, make new user an admin AND global admin AND active
126
    user_test = User.query.filter_by(admin=True).first()
127
    if not user_test:
128
        admin = True
129
        is_global_admin = True
130
        is_active = True
131
    else:
132
        # New signups are admins but inactive until approved by global admin
133
        admin = True
134
        is_global_admin = False
135
        is_active = False
136
137
    # create a new user with the form data. Hash the password so the plaintext version isn't saved.
138
    new_user = User(
139
        email=email,
140
        name=name,
141
        password=generate_password_hash(password, method='scrypt'),
142
        admin=admin,
143
        is_global_admin=is_global_admin,
144
        is_active=is_active
145
    )
146
147
    # add the new user to the database
148
    db.session.add(new_user)
149
    db.session.commit()
150
151
    # Send notification to global admin if this is not the first user
152
    # (first user becomes global admin, so no need to notify themselves)
153
    if not is_global_admin:
154
        try:
155
            send_new_user_notification(name, email)
156
        except Exception as e:
157
            # Don't fail registration if email notification fails
158
            print(f"Failed to send new user notification: {e}")
159
160
    return redirect(url_for('auth.login'))
161
162
163
@auth.route('/logout')
164
@login_required
165
def logout():
166
    logout_user()
167
    return redirect(url_for('main.index'))
168
169
170
def admin_required(f):
171
    @wraps(f)
172
    def decorated_function(*args, **kwargs):
173
        if current_user.admin:
174
            return f(*args, **kwargs)
175
        else:
176
            return redirect(url_for('main.index'))
177
    return decorated_function
178
179
180
def global_admin_required(f):
181
    """
182
    Decorator for routes that require global admin access.
183
    Only users with is_global_admin=True can access.
184
    """
185
    @wraps(f)
186
    def decorated_function(*args, **kwargs):
187
        if not current_user.is_authenticated:
188
            return redirect(url_for('auth.login'))
189
        if current_user.is_global_admin:
190
            return f(*args, **kwargs)
191
        else:
192
            flash('Global admin access required')
193
            return redirect(url_for('main.index'))
194
    return decorated_function
195
196
197
def account_owner_required(f):
198
    """
199
    Decorator for routes that require account owner access.
200
    Guest users (those with account_owner_id set) cannot access.
201
    """
202
    @wraps(f)
203
    def decorated_function(*args, **kwargs):
204
        if not current_user.is_authenticated:
205
            return redirect(url_for('auth.login'))
206
        if current_user.account_owner_id is not None:
207
            flash('Account owner access required')
208
            return redirect(url_for('main.index'))
209
        return f(*args, **kwargs)
210
    return decorated_function
211
212
213
@auth.route('/passkey_login')
214
def login_passkey():
215
216
    if corbado_enabled:
217
        project_id = os.environ['PROJECT_ID']
218
        frontend_uri = os.environ['FRONTEND_URI']
219
220
        return render_template('passkey_login.html', project_id=project_id, frontend_uri=frontend_uri)
221
    else:
222
        flash('Passkey authentication is not enabled.')
223
        return redirect(url_for('auth.login'))
224
225
226
@auth.route('/passkey_login_post')
227
def login_passkey_post():
228
229
    auth_user = get_authenticated_user_from_cookie()
230
    if auth_user:
231
        email_identifiers = sdk.identifiers.list_all_emails_by_user_id(user_id=auth_user.user_id)
0 ignored issues
show
introduced by
The variable sdk does not seem to be defined in case corbado_enabled on line 28 is False. Are you sure this can never be the case?
Loading history...
232
        email = email_identifiers[0].value
233
    else:
234
        # use more sophisticated error handling in production
235
        raise Unauthorized()
236
237
    user = User.query.filter_by(email=email).first()
238
239
    # check if the user actually exists
240
    # take the user-supplied password, hash it, and compare it to the hashed password in the database
241
    if not user:
242
        flash('Please check your login details and try again.')
243
        return redirect(url_for('auth.login'))  # if the user doesn't exist or password is wrong, reload the page
244
245
    # fix for no admin user to make current user an admin
246
    user_test = User.query.filter_by(admin=True).first()
247
    if not user_test:
248
        user.admin = 1
249
        db.session.commit()
250
251
    # ensure there's at least one global admin in the system
252
    global_admin_test = User.query.filter_by(is_global_admin=True).first()
253
    if not global_admin_test:
254
        # Set the first admin user to be a global admin
255
        first_admin = User.query.filter_by(admin=True).order_by(User.id).first()
256
        if first_admin:
257
            first_admin.is_global_admin = True
258
            db.session.commit()
259
260
    # if the above check passes, then we know the user has the right credentials
261
    login_user(user, remember=True)
262
    session['name'] = user.name
263
    session['email'] = user.email
264
265
    return redirect(url_for('main.index'))
266
267
268
def get_authenticated_user_from_cookie() -> UserEntity | None:
269
    session_token = request.cookies.get('cbo_session_token')
270
    if not session_token:
271
        return None
272
    try:
273
        return sdk.sessions.validate_token(session_token)
0 ignored issues
show
introduced by
The variable sdk does not seem to be defined in case corbado_enabled on line 28 is False. Are you sure this can never be the case?
Loading history...
274
    except:
275
        raise Unauthorized()