Passed
Push — master ( f6f0c8...bd590d )
by William
03:42
created

app.auth.get_authenticated_user_from_cookie()   A

Complexity

Conditions 3

Size

Total Lines 8
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 8
nop 0
dl 0
loc 8
rs 10
c 0
b 0
f 0
1
from flask import Blueprint, render_template, redirect, url_for, request, flash, session
2
from flask_login import login_user, login_required, logout_user, current_user
3
from werkzeug.security import generate_password_hash, check_password_hash
4
from .models import User
5
from app import db
6
import pandas as pd
7
import os
8
from functools import wraps
9
from werkzeug.exceptions import Unauthorized
10
from corbado_python_sdk import Config, CorbadoSDK, UserEntity
11
12
13
auth = Blueprint('auth', __name__)
14
15
16
short_session_cookie_name = "cbo_short_session"
17
# Config has a default values for 'short_session_cookie_name' and 'BACKEND_API'
18
config: Config = Config(
19
    api_secret=os.environ['API_SECRET'],
20
    project_id=os.environ['PROJECT_ID'],
21
    frontend_api=os.environ['FRONTEND_URI'],
22
    backend_api="https://backendapi.cloud.corbado.io",
23
)
24
config.frontend_api = os.environ['FRONTEND_URI']
25
26
# Initialize SDK
27
sdk: CorbadoSDK = CorbadoSDK(config=config)
28
29
30
@auth.route('/login')
31
def login():
32
    return render_template('login.html')
33
34
35
@auth.route('/login', methods=['POST'])
36
def login_post():
37
    # login code goes here
38
    email = request.form.get('email')
39
    password = request.form.get('password')
40
    remember = True if request.form.get('remember') else False
41
42
    user = User.query.filter_by(email=email).first()
43
44
    # check if the user actually exists
45
    # take the user-supplied password, hash it, and compare it to the hashed password in the database
46
    if not user or not check_password_hash(user.password, password):
47
        flash('Please check your login details and try again.')
48
        return redirect(url_for('auth.login')) # if the user doesn't exist or password is wrong, reload the page
49
50
    # fix for no admin user to make current user an admin
51
    user_test = User.query.filter_by(admin=True).first()
52
    if not user_test:
53
        user.admin = 1
54
        db.session.commit()
55
56
    # if the above check passes, then we know the user has the right credentials
57
    login_user(user, remember=remember)
58
    session['name'] = user.name
59
    session['email'] = user.email
60
61
    return redirect(url_for('main.index'))
62
63
64
@auth.route('/signup')
65
def signup():
66
    try:
67
        engine = db.create_engine(os.environ.get('DATABASE_URL')).connect()
68
    except:
69
        engine = db.create_engine('sqlite:///db.sqlite').connect()
70
71
    try:
72
        df = pd.read_sql('SELECT * FROM settings;', engine)
73
74
        if df['value'][0] == 1:
75
            return render_template('login.html')
76
    except:
77
        pass
78
79
    return render_template('signup.html')
80
81
82
@auth.route('/signup', methods=['POST'])
83
def signup_post():
84
    # code to validate and add user to database goes here
85
    email = request.form.get('email')
86
    name = request.form.get('name')
87
    password = request.form.get('password')
88
89
    user = User.query.filter_by(email=email).first() # if this returns a user, then the email already exists in database
90
91
    if user: # if a user is found, we want to redirect back to signup page so user can try again
92
        return redirect(url_for('auth.signup'))
93
94
    # if no admin user, make new user an admin
95
    user_test = User.query.filter_by(admin=True).first()
96
    if not user_test:
97
        admin = 1
98
    else:
99
        admin = 0
100
101
    # create a new user with the form data. Hash the password so the plaintext version isn't saved.
102
    new_user = User(email=email, name=name, password=generate_password_hash(password, method='scrypt'), admin=admin)
103
104
    # add the new user to the database
105
    db.session.add(new_user)
106
    db.session.commit()
107
    if user:  # if a user is found, we want to redirect back to signup page so user can try again
108
        flash('Email address already exists')
109
    return redirect(url_for('auth.login'))
110
111
112
@auth.route('/logout')
113
@login_required
114
def logout():
115
    logout_user()
116
    return redirect(url_for('main.index'))
117
118
119
def admin_required(f):
120
    @wraps(f)
121
    def decorated_function(*args, **kwargs):
122
        if current_user.admin:
123
            return f(*args, **kwargs)
124
        else:
125
            return redirect(url_for('main.index'))
126
    return decorated_function
127
128
129
@auth.route('/passkey_login')
130
def login_passkey():
131
    project_id = os.environ['PROJECT_ID']
132
    frontend_uri = os.environ['FRONTEND_URI']
133
134
    return render_template('passkey_login.html', project_id=project_id, frontend_uri=frontend_uri)
135
136
137
@auth.route('/passkey_login_post')
138
def login_passkey_post():
139
140
    auth_user = get_authenticated_user_from_cookie()
141
    if auth_user:
142
        email_identifiers = sdk.identifiers.list_all_emails_by_user_id(user_id=auth_user.user_id)
143
        email = email_identifiers[0].value
144
    else:
145
        # use more sophisticated error handling in production
146
        raise Unauthorized()
147
148
    user = User.query.filter_by(email=email).first()
149
150
    # check if the user actually exists
151
    # take the user-supplied password, hash it, and compare it to the hashed password in the database
152
    if not user:
153
        flash('Please check your login details and try again.')
154
        return redirect(url_for('auth.login'))  # if the user doesn't exist or password is wrong, reload the page
155
156
    # fix for no admin user to make current user an admin
157
    user_test = User.query.filter_by(admin=True).first()
158
    if not user_test:
159
        user.admin = 1
160
        db.session.commit()
161
162
    # if the above check passes, then we know the user has the right credentials
163
    login_user(user, remember=True)
164
    session['name'] = user.name
165
    session['email'] = user.email
166
167
    return redirect(url_for('main.index'))
168
169
170
def get_authenticated_user_from_cookie() -> UserEntity | None:
171
    session_token = request.cookies.get('cbo_session_token')
172
    if not session_token:
173
        return None
174
    try:
175
        return sdk.sessions.validate_token(session_token)
176
    except:
177
        raise Unauthorized()