whahn1983 /
pycashflow
| 1 | from flask import Blueprint, render_template, redirect, url_for, request, flash, session |
||
| 2 | from flask_login import login_user, login_required, logout_user, current_user |
||
| 3 | from werkzeug.security import generate_password_hash, check_password_hash |
||
| 4 | from .models import User |
||
| 5 | from app import db |
||
| 6 | from .getemail import send_new_user_notification |
||
| 7 | import pandas as pd |
||
| 8 | import os |
||
| 9 | from functools import wraps |
||
| 10 | from werkzeug.exceptions import Unauthorized |
||
| 11 | from corbado_python_sdk import Config, CorbadoSDK, UserEntity |
||
| 12 | |||
| 13 | |||
| 14 | auth = Blueprint('auth', __name__) |
||
| 15 | |||
| 16 | |||
| 17 | short_session_cookie_name = "cbo_short_session" |
||
| 18 | |||
| 19 | # Read environment variables safely (returns None if missing) |
||
| 20 | API_SECRET = os.getenv("API_SECRET") |
||
| 21 | PROJECT_ID = os.getenv("PROJECT_ID") |
||
| 22 | FRONTEND_URI = os.getenv("FRONTEND_URI") |
||
| 23 | |||
| 24 | corbado_config = None |
||
| 25 | corbado_enabled = all([API_SECRET, PROJECT_ID, FRONTEND_URI]) |
||
| 26 | |||
| 27 | # Config has a default values for 'short_session_cookie_name' and 'BACKEND_API' |
||
| 28 | if corbado_enabled: |
||
| 29 | config: Config = Config( |
||
| 30 | api_secret=os.environ['API_SECRET'], |
||
| 31 | project_id=os.environ['PROJECT_ID'], |
||
| 32 | frontend_api=os.environ['FRONTEND_URI'], |
||
| 33 | backend_api="https://backendapi.cloud.corbado.io", |
||
| 34 | ) |
||
| 35 | config.frontend_api = os.environ['FRONTEND_URI'] |
||
| 36 | |||
| 37 | # Initialize SDK |
||
| 38 | sdk: CorbadoSDK = CorbadoSDK(config=config) |
||
| 39 | |||
| 40 | |||
| 41 | @auth.route('/login') |
||
| 42 | def login(): |
||
| 43 | return render_template('login.html') |
||
| 44 | |||
| 45 | |||
| 46 | @auth.route('/login', methods=['POST']) |
||
| 47 | def login_post(): |
||
| 48 | # login code goes here |
||
| 49 | email = request.form.get('email') |
||
| 50 | password = request.form.get('password') |
||
| 51 | remember = True if request.form.get('remember') else False |
||
| 52 | |||
| 53 | user = User.query.filter_by(email=email).first() |
||
| 54 | |||
| 55 | # check if the user actually exists |
||
| 56 | # take the user-supplied password, hash it, and compare it to the hashed password in the database |
||
| 57 | if not user or not check_password_hash(user.password, password): |
||
| 58 | flash('Please check your login details and try again.') |
||
| 59 | return redirect(url_for('auth.login')) # if the user doesn't exist or password is wrong, reload the page |
||
| 60 | |||
| 61 | # fix for no admin user to make current user an admin |
||
| 62 | user_test = User.query.filter_by(admin=True).first() |
||
| 63 | if not user_test: |
||
| 64 | user.admin = 1 |
||
| 65 | db.session.commit() |
||
| 66 | |||
| 67 | # ensure there's at least one global admin in the system |
||
| 68 | global_admin_test = User.query.filter_by(is_global_admin=True).first() |
||
| 69 | if not global_admin_test: |
||
| 70 | # Set the first admin user to be a global admin |
||
| 71 | first_admin = User.query.filter_by(admin=True).order_by(User.id).first() |
||
| 72 | if first_admin: |
||
| 73 | first_admin.is_global_admin = True |
||
| 74 | db.session.commit() |
||
| 75 | |||
| 76 | # IMPORTANT: Global admins are always active - auto-activate if needed |
||
| 77 | if user.is_global_admin and not user.is_active: |
||
| 78 | user.is_active = True |
||
| 79 | db.session.commit() |
||
| 80 | |||
| 81 | # check if the user account is active (after global admin auto-activation) |
||
| 82 | if not user.is_active: |
||
| 83 | flash('Your account is pending approval. Please contact an administrator.') |
||
| 84 | return redirect(url_for('auth.login')) |
||
| 85 | |||
| 86 | # if the above check passes, then we know the user has the right credentials |
||
| 87 | login_user(user, remember=remember) |
||
| 88 | session['name'] = user.name |
||
| 89 | session['email'] = user.email |
||
| 90 | |||
| 91 | return redirect(url_for('main.index')) |
||
| 92 | |||
| 93 | |||
| 94 | @auth.route('/signup') |
||
| 95 | def signup(): |
||
| 96 | try: |
||
| 97 | engine = db.create_engine(os.environ.get('DATABASE_URL')).connect() |
||
| 98 | except: |
||
| 99 | engine = db.create_engine('sqlite:///db.sqlite').connect() |
||
| 100 | |||
| 101 | try: |
||
| 102 | df = pd.read_sql('SELECT * FROM settings;', engine) |
||
| 103 | |||
| 104 | if df['value'][0] == 1: |
||
| 105 | return render_template('login.html') |
||
| 106 | except: |
||
| 107 | pass |
||
| 108 | |||
| 109 | return render_template('signup.html') |
||
| 110 | |||
| 111 | |||
| 112 | @auth.route('/signup', methods=['POST']) |
||
| 113 | def signup_post(): |
||
| 114 | # code to validate and add user to database goes here |
||
| 115 | email = request.form.get('email') |
||
| 116 | name = request.form.get('name') |
||
| 117 | password = request.form.get('password') |
||
| 118 | |||
| 119 | user = User.query.filter_by(email=email).first() # if this returns a user, then the email already exists in database |
||
| 120 | |||
| 121 | if user: # if a user is found, we want to redirect back to signup page so user can try again |
||
| 122 | flash('Email address already exists') |
||
| 123 | return redirect(url_for('auth.signup')) |
||
| 124 | |||
| 125 | # if no admin user, make new user an admin AND global admin AND active |
||
| 126 | user_test = User.query.filter_by(admin=True).first() |
||
| 127 | if not user_test: |
||
| 128 | admin = True |
||
| 129 | is_global_admin = True |
||
| 130 | is_active = True |
||
| 131 | else: |
||
| 132 | # New signups are admins but inactive until approved by global admin |
||
| 133 | admin = True |
||
| 134 | is_global_admin = False |
||
| 135 | is_active = False |
||
| 136 | |||
| 137 | # create a new user with the form data. Hash the password so the plaintext version isn't saved. |
||
| 138 | new_user = User( |
||
| 139 | email=email, |
||
| 140 | name=name, |
||
| 141 | password=generate_password_hash(password, method='scrypt'), |
||
| 142 | admin=admin, |
||
| 143 | is_global_admin=is_global_admin, |
||
| 144 | is_active=is_active |
||
| 145 | ) |
||
| 146 | |||
| 147 | # add the new user to the database |
||
| 148 | db.session.add(new_user) |
||
| 149 | db.session.commit() |
||
| 150 | |||
| 151 | # Send notification to global admin if this is not the first user |
||
| 152 | # (first user becomes global admin, so no need to notify themselves) |
||
| 153 | if not is_global_admin: |
||
| 154 | try: |
||
| 155 | send_new_user_notification(name, email) |
||
| 156 | except Exception as e: |
||
| 157 | # Don't fail registration if email notification fails |
||
| 158 | print(f"Failed to send new user notification: {e}") |
||
| 159 | |||
| 160 | return redirect(url_for('auth.login')) |
||
| 161 | |||
| 162 | |||
| 163 | @auth.route('/logout') |
||
| 164 | @login_required |
||
| 165 | def logout(): |
||
| 166 | logout_user() |
||
| 167 | return redirect(url_for('main.index')) |
||
| 168 | |||
| 169 | |||
| 170 | def admin_required(f): |
||
| 171 | @wraps(f) |
||
| 172 | def decorated_function(*args, **kwargs): |
||
| 173 | if current_user.admin: |
||
| 174 | return f(*args, **kwargs) |
||
| 175 | else: |
||
| 176 | return redirect(url_for('main.index')) |
||
| 177 | return decorated_function |
||
| 178 | |||
| 179 | |||
| 180 | def global_admin_required(f): |
||
| 181 | """ |
||
| 182 | Decorator for routes that require global admin access. |
||
| 183 | Only users with is_global_admin=True can access. |
||
| 184 | """ |
||
| 185 | @wraps(f) |
||
| 186 | def decorated_function(*args, **kwargs): |
||
| 187 | if not current_user.is_authenticated: |
||
| 188 | return redirect(url_for('auth.login')) |
||
| 189 | if current_user.is_global_admin: |
||
| 190 | return f(*args, **kwargs) |
||
| 191 | else: |
||
| 192 | flash('Global admin access required') |
||
| 193 | return redirect(url_for('main.index')) |
||
| 194 | return decorated_function |
||
| 195 | |||
| 196 | |||
| 197 | def account_owner_required(f): |
||
| 198 | """ |
||
| 199 | Decorator for routes that require account owner access. |
||
| 200 | Guest users (those with account_owner_id set) cannot access. |
||
| 201 | """ |
||
| 202 | @wraps(f) |
||
| 203 | def decorated_function(*args, **kwargs): |
||
| 204 | if not current_user.is_authenticated: |
||
| 205 | return redirect(url_for('auth.login')) |
||
| 206 | if current_user.account_owner_id is not None: |
||
| 207 | flash('Account owner access required') |
||
| 208 | return redirect(url_for('main.index')) |
||
| 209 | return f(*args, **kwargs) |
||
| 210 | return decorated_function |
||
| 211 | |||
| 212 | |||
| 213 | @auth.route('/passkey_login') |
||
| 214 | def login_passkey(): |
||
| 215 | |||
| 216 | if corbado_enabled: |
||
| 217 | project_id = os.environ['PROJECT_ID'] |
||
| 218 | frontend_uri = os.environ['FRONTEND_URI'] |
||
| 219 | |||
| 220 | return render_template('passkey_login.html', project_id=project_id, frontend_uri=frontend_uri) |
||
| 221 | else: |
||
| 222 | flash('Passkey authentication is not enabled.') |
||
| 223 | return redirect(url_for('auth.login')) |
||
| 224 | |||
| 225 | |||
| 226 | @auth.route('/passkey_login_post') |
||
| 227 | def login_passkey_post(): |
||
| 228 | |||
| 229 | auth_user = get_authenticated_user_from_cookie() |
||
| 230 | if auth_user: |
||
| 231 | email_identifiers = sdk.identifiers.list_all_emails_by_user_id(user_id=auth_user.user_id) |
||
|
0 ignored issues
–
show
introduced
by
Loading history...
|
|||
| 232 | email = email_identifiers[0].value |
||
| 233 | else: |
||
| 234 | # use more sophisticated error handling in production |
||
| 235 | raise Unauthorized() |
||
| 236 | |||
| 237 | user = User.query.filter_by(email=email).first() |
||
| 238 | |||
| 239 | # check if the user actually exists |
||
| 240 | # take the user-supplied password, hash it, and compare it to the hashed password in the database |
||
| 241 | if not user: |
||
| 242 | flash('Please check your login details and try again.') |
||
| 243 | return redirect(url_for('auth.login')) # if the user doesn't exist or password is wrong, reload the page |
||
| 244 | |||
| 245 | # fix for no admin user to make current user an admin |
||
| 246 | user_test = User.query.filter_by(admin=True).first() |
||
| 247 | if not user_test: |
||
| 248 | user.admin = 1 |
||
| 249 | db.session.commit() |
||
| 250 | |||
| 251 | # ensure there's at least one global admin in the system |
||
| 252 | global_admin_test = User.query.filter_by(is_global_admin=True).first() |
||
| 253 | if not global_admin_test: |
||
| 254 | # Set the first admin user to be a global admin |
||
| 255 | first_admin = User.query.filter_by(admin=True).order_by(User.id).first() |
||
| 256 | if first_admin: |
||
| 257 | first_admin.is_global_admin = True |
||
| 258 | db.session.commit() |
||
| 259 | |||
| 260 | # if the above check passes, then we know the user has the right credentials |
||
| 261 | login_user(user, remember=True) |
||
| 262 | session['name'] = user.name |
||
| 263 | session['email'] = user.email |
||
| 264 | |||
| 265 | return redirect(url_for('main.index')) |
||
| 266 | |||
| 267 | |||
| 268 | def get_authenticated_user_from_cookie() -> UserEntity | None: |
||
| 269 | session_token = request.cookies.get('cbo_session_token') |
||
| 270 | if not session_token: |
||
| 271 | return None |
||
| 272 | try: |
||
| 273 | return sdk.sessions.validate_token(session_token) |
||
|
0 ignored issues
–
show
|
|||
| 274 | except: |
||
| 275 | raise Unauthorized() |