Passed
Push — main ( 0829d8...17286a )
by Sat CFDI
02:52
created

CacheInteractiveBrowserCredential.get_token()   B

Complexity

Conditions 6

Size

Total Lines 35
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 28
dl 0
loc 35
rs 8.2746
c 0
b 0
f 0
cc 6
nop 5
1
import time
2
3
from typing import Optional, Any
4
from azure.core.credentials import TokenCredential, AccessToken
5
6
from .jwt_util import load_jwt
7
from .microsoft_oauth2 import get_token, token_refresh
8
9
AZURE_CLI = "04b07795-8ddb-461a-bbee-02f9e1bf7b46"
10
11
12
class CacheInteractiveBrowserCredential(TokenCredential):
13
    def __init__(self, cache, login_hint=None, browser_name=None):
14
        self.cache = cache
15
        self.login_hint = login_hint
16
        self.browser_name = browser_name
17
18
    """Authenticates by requesting a token from the Azure CLI.
19
20
    This requires previously logging in to Azure via "az login", and will use the CLI's currently logged in identity.
21
    """
22
23
    def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None, **kwargs: Any) -> AccessToken:
24
        if tenant_id is None:
25
            raise ValueError("Tenant is required")
26
        if len(scopes) != 1:
27
            raise ValueError("This credential requires exactly one scope per token request.")
28
29
        service_token = self.cache.get((tenant_id, scopes[0]))
30
        if not service_token:
31
            refresh_token = self.cache.get(tenant_id)
32
            if not refresh_token:
33
                refresh_token = get_token(
34
                    issuer_uri="https://login.microsoftonline.com/f20a8dfd-ca10-4d0f-926d-e0a08b44bb15/",
35
                    client_id=AZURE_CLI,
36
                    scopes=['offline_access', 'openid', 'profile'],
37
                    login_hint=self.login_hint,
38
                    domain_hint=self.login_hint.split("@")[-1] if "@" in (self.login_hint or "") else None,
39
                    browser_name=self.browser_name,
40
                )['refresh_token']
41
                self.cache.set(tenant_id, refresh_token, expire=90*24*60*60)  # expire=int(token["expires_in"])
42
43
            # Tenant properties
44
            service_token = token_refresh(
45
                refresh_token=refresh_token,
46
                issuer_uri=f"https://sts.windows.net/{tenant_id}/",
47
                client_id=AZURE_CLI,
48
                scopes=scopes,
49
            )
50
51
            self.cache.set((tenant_id, scopes[0]), service_token, expire=service_token["expires_in"])
52
            refresh_token = service_token["refresh_token"]
53
            self.cache.set(tenant_id, refresh_token, expire=90*24*60*60)
54
55
        # return service_token
56
        _, payload, _ = load_jwt(service_token["access_token"])
57
        return AccessToken(service_token["access_token"], payload["exp"])
58