_AuthCodeHttpServer.handle_timeout()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 4
ccs 1
cts 2
cp 0.5
rs 10
c 0
b 0
f 0
cc 1
nop 1
crap 1.125
1
# from https://github.com/AzureAD/microsoft-authentication-library-for-python
2
# modified to not use threading so as to wait for response
3 1
import logging
4 1
import sys
5 1
from http.server import HTTPServer, BaseHTTPRequestHandler
6 1
from string import Template
7 1
from urllib.parse import urlparse, parse_qs
8
9 1
logger = logging.getLogger(__name__)
10
11
12 1
def _qs2kv(qs):
13
    """Flatten parse_qs()'s single-item lists into the item itself"""
14
    return {k: v[0] if isinstance(v, list) and len(v) == 1 else v
15
            for k, v in qs.items()}
16
17
18
# success_template = Template("Authentication completed. You can close this window now.")
19 1
error_template = Template("Authentication failed. $error: $error_description. ($error_uri)")
20 1
welcome_template = "Welcome"
21
22 1
success_template = Template('<html><body>Authentication completed. You can close this window now.<script>setTimeout("window.close()",3000)</script></body></html>')
23
24
25 1
class _AuthCodeHandler(BaseHTTPRequestHandler):
26 1
    def do_GET(self):
27
        # For flexibility, we choose to not check self.path matching redirect_uri
28
        # assert self.path.startswith('/THE_PATH_REGISTERED_BY_THE_APP')
29
        qs = parse_qs(urlparse(self.path).query)
30
        if qs.get('code') or qs.get("error"):  # So, it is an auth response
31
            self.server.auth_response = _qs2kv(qs)
32
            logger.debug("Got auth response: %s", self.server.auth_response)
33
            template = (success_template
34
                        if "code" in qs else error_template)
35
            self._send_full_response(
36
                template.safe_substitute(**self.server.auth_response))
37
            # NOTE: Don't do self.server.shutdown() here. It'll halt the server.
38
        else:
39
            self._send_full_response(welcome_template)
40
41 1
    def _send_full_response(self, body, is_ok=True):
42
        self.send_response(200 if is_ok else 400)
43
        content_type = 'text/plain'
44
        content_type = 'text/html'
45
        self.send_header('Content-type', content_type)
46
        self.end_headers()
47
        self.wfile.write(body.encode("utf-8"))
48
49 1
    def log_message(self, format, *args):
50
        logger.debug(format, *args)  # To override the default log-to-stderr behavior
51
52
53 1
def is_wsl():
54
    # "Official" way of detecting WSL: https://github.com/Microsoft/WSL/issues/423#issuecomment-221627364
55
    # Run `uname -a` to get 'release' without python
56
    #   - WSL 1: '4.4.0-19041-Microsoft'
57
    #   - WSL 2: '4.19.128-microsoft-standard'
58
    import platform
59
    uname = platform.uname()
60
    platform_name = getattr(uname, 'system', uname[0]).lower()
61
    release = getattr(uname, 'release', uname[2]).lower()
62
    return platform_name == 'linux' and 'microsoft' in release
63
64
65 1
class _AuthCodeHttpServer(HTTPServer, object):
66
    # address_family = socket.AF_INET6 # if using ipv6
67 1
    def __init__(self, server_address, *args, **kwargs):
68 1
        _, port = server_address
69 1
        if port and (sys.platform == "win32" or is_wsl()):
70
            # The default allow_reuse_address is True. It works fine on non-Windows.
71
            # On Windows, it undesirably allows multiple servers listening on same port,
72
            # yet the second server would not receive any incoming request.
73
            # So, we need to turn it off.
74
            self.allow_reuse_address = False
75 1
        super(_AuthCodeHttpServer, self).__init__(server_address, *args, **kwargs)
76
77 1
    def handle_timeout(self):
78
        # It will be triggered when no request comes in self.timeout seconds.
79
        # See https://docs.python.org/3/library/socketserver.html#socketserver.BaseServer.handle_timeout
80
        raise RuntimeError("Timeout. No auth response arrived.")  # Terminates this server
81
        # We choose to not call self.server_close() here,
82
        # because it would cause a socket.error exception in handle_request(),
83
        # and likely end up the server being server_close() twice.
84
85
86 1
class AuthCodeReceiver(object):
87
    # This class has (rather than is) an _AuthCodeHttpServer, so it does not leak API
88 1
    def __init__(self, port=None):
89 1
        address = "127.0.0.1"
90 1
        self._server = _AuthCodeHttpServer((address, port or 0), _AuthCodeHandler)
91 1
        self._closing = False
92
93 1
    def get_port(self):
94
        """The port this server actually listening to"""
95
        # https://docs.python.org/2.7/library/socketserver.html#SocketServer.BaseServer.server_address
96 1
        return self._server.server_address[1]
97
98 1
    def get_auth_response(self, timeout=None, state=None):
99
        self._server.timeout = timeout  # Otherwise its handle_timeout() won't work
100
        self._server.auth_response = {}  # Shared with _AuthCodeHandler
101
        while not self._closing:  # Otherwise, the handle_request() attempt
102
            self._server.handle_request()
103
            if self._server.auth_response:
104
                if state and state != self._server.auth_response.get("state"):
105
                    logger.debug("State mismatch. Ignoring this noise.")
106
                else:
107
                    break
108
109
        response = self._server.auth_response
110
        self._server.server_close()
111
        return response or None  # Return via writable result param
112
113 1
    def close(self):
114
        """Either call this eventually; or use the entire class as context manager"""
115
        self._closing = True
116
        self._server.server_close()
117
118 1
    def __enter__(self):
119
        return self
120
121 1
    def __exit__(self, exc_type, exc_val, exc_tb):
122
        self.close()
123