Passed
Push — main ( 0829d8...17286a )
by Sat CFDI
02:52
created

satdigitalinvoice.microsoft_auth.cache_interactive_browser   A

Complexity

Total Complexity 7

Size/Duplication

Total Lines 58
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 7
eloc 40
dl 0
loc 58
rs 10
c 0
b 0
f 0

2 Methods

Rating   Name   Duplication   Size   Complexity  
B CacheInteractiveBrowserCredential.get_token() 0 35 6
A CacheInteractiveBrowserCredential.__init__() 0 4 1
1
import time
2
3
from typing import Optional, Any
4
from azure.core.credentials import TokenCredential, AccessToken
5
6
from .jwt_util import load_jwt
7
from .microsoft_oauth2 import get_token, token_refresh
8
9
AZURE_CLI = "04b07795-8ddb-461a-bbee-02f9e1bf7b46"
10
11
12
class CacheInteractiveBrowserCredential(TokenCredential):
13
    def __init__(self, cache, login_hint=None, browser_name=None):
14
        self.cache = cache
15
        self.login_hint = login_hint
16
        self.browser_name = browser_name
17
18
    """Authenticates by requesting a token from the Azure CLI.
19
20
    This requires previously logging in to Azure via "az login", and will use the CLI's currently logged in identity.
21
    """
22
23
    def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None, **kwargs: Any) -> AccessToken:
24
        if tenant_id is None:
25
            raise ValueError("Tenant is required")
26
        if len(scopes) != 1:
27
            raise ValueError("This credential requires exactly one scope per token request.")
28
29
        service_token = self.cache.get((tenant_id, scopes[0]))
30
        if not service_token:
31
            refresh_token = self.cache.get(tenant_id)
32
            if not refresh_token:
33
                refresh_token = get_token(
34
                    issuer_uri="https://login.microsoftonline.com/f20a8dfd-ca10-4d0f-926d-e0a08b44bb15/",
35
                    client_id=AZURE_CLI,
36
                    scopes=['offline_access', 'openid', 'profile'],
37
                    login_hint=self.login_hint,
38
                    domain_hint=self.login_hint.split("@")[-1] if "@" in (self.login_hint or "") else None,
39
                    browser_name=self.browser_name,
40
                )['refresh_token']
41
                self.cache.set(tenant_id, refresh_token, expire=90*24*60*60)  # expire=int(token["expires_in"])
42
43
            # Tenant properties
44
            service_token = token_refresh(
45
                refresh_token=refresh_token,
46
                issuer_uri=f"https://sts.windows.net/{tenant_id}/",
47
                client_id=AZURE_CLI,
48
                scopes=scopes,
49
            )
50
51
            self.cache.set((tenant_id, scopes[0]), service_token, expire=service_token["expires_in"])
52
            refresh_token = service_token["refresh_token"]
53
            self.cache.set(tenant_id, refresh_token, expire=90*24*60*60)
54
55
        # return service_token
56
        _, payload, _ = load_jwt(service_token["access_token"])
57
        return AccessToken(service_token["access_token"], payload["exp"])
58