1
|
|
|
import time |
2
|
|
|
|
3
|
|
|
from typing import Optional, Any |
4
|
|
|
from azure.core.credentials import TokenCredential, AccessToken |
5
|
|
|
|
6
|
|
|
from .jwt_util import load_jwt |
7
|
|
|
from .microsoft_oauth2 import get_token, token_refresh |
8
|
|
|
|
9
|
|
|
AZURE_CLI = "04b07795-8ddb-461a-bbee-02f9e1bf7b46" |
10
|
|
|
|
11
|
|
|
|
12
|
|
|
class CacheInteractiveBrowserCredential(TokenCredential): |
13
|
|
|
def __init__(self, cache, login_hint=None, browser_name=None): |
14
|
|
|
self.cache = cache |
15
|
|
|
self.login_hint = login_hint |
16
|
|
|
self.browser_name = browser_name |
17
|
|
|
|
18
|
|
|
"""Authenticates by requesting a token from the Azure CLI. |
19
|
|
|
|
20
|
|
|
This requires previously logging in to Azure via "az login", and will use the CLI's currently logged in identity. |
21
|
|
|
""" |
22
|
|
|
|
23
|
|
|
def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None, **kwargs: Any) -> AccessToken: |
24
|
|
|
if tenant_id is None: |
25
|
|
|
raise ValueError("Tenant is required") |
26
|
|
|
if len(scopes) != 1: |
27
|
|
|
raise ValueError("This credential requires exactly one scope per token request.") |
28
|
|
|
|
29
|
|
|
service_token = self.cache.get((tenant_id, scopes[0])) |
30
|
|
|
if not service_token: |
31
|
|
|
refresh_token = self.cache.get(tenant_id) |
32
|
|
|
if not refresh_token: |
33
|
|
|
refresh_token = get_token( |
34
|
|
|
issuer_uri="https://login.microsoftonline.com/f20a8dfd-ca10-4d0f-926d-e0a08b44bb15/", |
35
|
|
|
client_id=AZURE_CLI, |
36
|
|
|
scopes=['offline_access', 'openid', 'profile'], |
37
|
|
|
login_hint=self.login_hint, |
38
|
|
|
domain_hint=self.login_hint.split("@")[-1] if "@" in (self.login_hint or "") else None, |
39
|
|
|
browser_name=self.browser_name, |
40
|
|
|
)['refresh_token'] |
41
|
|
|
self.cache.set(tenant_id, refresh_token, expire=90*24*60*60) # expire=int(token["expires_in"]) |
42
|
|
|
|
43
|
|
|
# Tenant properties |
44
|
|
|
service_token = token_refresh( |
45
|
|
|
refresh_token=refresh_token, |
46
|
|
|
issuer_uri=f"https://sts.windows.net/{tenant_id}/", |
47
|
|
|
client_id=AZURE_CLI, |
48
|
|
|
scopes=scopes, |
49
|
|
|
) |
50
|
|
|
|
51
|
|
|
self.cache.set((tenant_id, scopes[0]), service_token, expire=service_token["expires_in"]) |
52
|
|
|
refresh_token = service_token["refresh_token"] |
53
|
|
|
self.cache.set(tenant_id, refresh_token, expire=90*24*60*60) |
54
|
|
|
|
55
|
|
|
# return service_token |
56
|
|
|
_, payload, _ = load_jwt(service_token["access_token"]) |
57
|
|
|
return AccessToken(service_token["access_token"], payload["exp"]) |
58
|
|
|
|