token_refresh()   A
last analyzed

Complexity

Conditions 3

Size

Total Lines 27
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 20
dl 0
loc 27
rs 9.4
c 0
b 0
f 0
cc 3
nop 5
1
import hashlib
2
import json
3
import posixpath
4
import secrets
5
import webbrowser
6
from base64 import urlsafe_b64encode
7
from random import randrange
8
from urllib.parse import urlparse, parse_qs
9
10
import requests
11
12
from .auth_code_receiver import AuthCodeReceiver
13
14
15
def get_token(
16
        issuer_uri,
17
        client_id,
18
        redirect_uri=None,
19
        scopes=None,
20
        login_hint=None,
21
        domain_hint=None,
22
        browser_name=None
23
):
24
    state = str(randrange(1000000000))  # "InfoOfWhereTheUserWantedToGo"
25
    nonce = str(randrange(1000000000))
26
27
    # Your first step is to generate_key_pair a code verifier and challenge:
28
    # Code verifier: Random URL-safe string with a minimum length of 43 characters.
29
    # Code challenge: Base64 URL-encoded SHA-256 hash of the code verifier.
30
    code_verifier = secrets.token_urlsafe(43)
31
    m = hashlib.sha256()
32
    m.update(code_verifier.encode())
33
    code_challenge = urlsafe_b64encode(m.digest()).decode().rstrip("=")
34
35
    if redirect_uri is None:
36
        server = AuthCodeReceiver(port=0)
37
        port = server.get_port()
38
        redirect_uri = f"http://localhost:{port}/"
39
    else:
40
        server = AuthCodeReceiver(port=int(redirect_uri.split(":")[-1]))
41
42
    res = requests.get(
43
        url=posixpath.join(issuer_uri, "oauth2/v2.0/authorize"),
44
        params={
45
            "client_id": client_id,
46
            "response_type": "code",
47
            "scope": " ".join(scopes or []),
48
            "redirect_uri": redirect_uri,
49
            "state": state,
50
            "nonce": nonce,  # Optional
51
            "code_challenge_method": "S256",
52
            "code_challenge": code_challenge,
53
            "client_info": "1",
54
            "prompt": "select_account",
55
            "login_hint": login_hint or "",
56
            "domain_hint": domain_hint or ""
57
        }
58
    )
59
60
    assert res.status_code == 200
61
62
    if browser_name:
63
        webbrowser.get(browser_name).open(res.request.url)
64
    else:
65
        webbrowser.open(res.request.url)
66
67
    q = server.get_auth_response(timeout=1000, state=state)
68
    return _exchange_code(
69
        client_id=client_id,
70
        issuer_uri=issuer_uri,
71
        code_verifier=code_verifier,
72
        redirect_url=redirect_uri,
73
        code=q["code"]
74
    )
75
76
77
def get_token_manual(
78
        issuer_uri,
79
        client_id,
80
        redirect_uri=None,
81
        scopes=None,
82
        login_hint=None,
83
        domain_hint=None,
84
        browser_name=None
85
):
86
    state = str(randrange(1000000000))  # "InfoOfWhereTheUserWantedToGo"
87
    nonce = str(randrange(1000000000))
88
89
    # Your first step is to generate_key_pair a code verifier and challenge:
90
    # Code verifier: Random URL-safe string with a minimum length of 43 characters.
91
    # Code challenge: Base64 URL-encoded SHA-256 hash of the code verifier.
92
    code_verifier = secrets.token_urlsafe(43)
93
    m = hashlib.sha256()
94
    m.update(code_verifier.encode())
95
    code_challenge = urlsafe_b64encode(m.digest()).decode().rstrip("=")
96
97
    print({
98
        "code_verifier": code_verifier,
99
        "code_challenge": code_challenge
100
    })
101
102
    res = requests.get(
103
        url=posixpath.join(issuer_uri, "oauth2/v2.0/authorize"),
104
        params={
105
            "client_id": client_id,
106
            "response_type": "code",
107
            "scope": " ".join(scopes),
108
            "redirect_uri": redirect_uri,
109
            "state": state,
110
            "nonce": nonce,  # Optional
111
            "code_challenge_method": "S256",
112
            "code_challenge": code_challenge,
113
            # "client_info": "1",
114
            # "prompt": "select_account",
115
            # "login_hint": login_hint or "",
116
            # "domain_hint": domain_hint or ""
117
        }
118
    )
119
    assert res.status_code == 200
120
121
    if browser_name:
122
        webbrowser.get(browser_name).open(res.request.url)
123
    else:
124
        webbrowser.open(res.request.url)
125
126
    response = input('TYPE YOUR RETURN URI:')
127
    o = urlparse(response.rstrip())
128
    q = parse_qs(o.query)
129
130
    return _exchange_code(
131
        client_id=client_id,
132
        issuer_uri=issuer_uri,
133
        code_verifier=code_verifier,
134
        redirect_url=redirect_uri,
135
        code=q["code"]
136
    )
137
138
139
def token_refresh(
140
        refresh_token,
141
        issuer_uri,
142
        client_id,
143
        scopes=None,
144
        origin=None
145
):
146
    data = {
147
        "grant_type": "refresh_token",
148
        "client_id": client_id,
149
        "refresh_token": refresh_token
150
    }
151
    if scopes:
152
        data["scope"] = " ".join(scopes or [])
153
154
    token = requests.post(
155
        url=posixpath.join(issuer_uri, "oauth2/v2.0/token"),
156
        data=data,
157
        headers={
158
            'origin': origin
159
        },
160
    )
161
162
    if token.status_code == 200:
163
        return token.json()
164
165
    raise Exception(token.json())
166
167
168
def _exchange_code(issuer_uri, client_id, code_verifier, redirect_url, code):
169
    token = requests.post(
170
        url=posixpath.join(issuer_uri, "oauth2/v2.0/token"),
171
        data={
172
            "grant_type": "authorization_code",
173
            "client_id": client_id,
174
            "code_verifier": code_verifier,
175
            "redirect_uri": redirect_url,
176
            "code": code
177
        },
178
        headers={
179
            # 'origin': redirect_url  # This might be need for web apps but not for desktop apps
180
        },
181
    )
182
    if token.status_code == 200:
183
        return token.json()
184
185
    assert token.status_code == 200, token.text
186