aws_alb_oauth_proxy.helpers._aws_region()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 15
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 7
nop 0
dl 0
loc 15
rs 10
c 0
b 0
f 0
1
import asyncio
0 ignored issues
show
Coding Style introduced by
This module should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
2
from concurrent.futures import TimeoutError
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in TimeoutError.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
3
import json
4
import logging
5
from typing import Optional
6
7
import aiohttp
8
from aiohttp import web
9
from multidict import CIMultiDictProxy
10
11
12
logger = logging.getLogger(__name__)
13
14
15
DEFAULT_REMOVED_RESPONSE_HEADERS = {"Content-Length", "Content-Encoding", "Transfer-Encoding"}
16
17
18
def clean_response_headers(request: web.Request) -> CIMultiDictProxy:
19
    """Removes HTTP headers from an upstream response and add auth header if present.
20
21
    :param request: A web.Request containing the request whose headers are to be cleaned.
22
    :return: A CIMultiDictProxy containing the clean headers.
23
    """
24
    clean_headers = request.headers.copy()
25
    for header in DEFAULT_REMOVED_RESPONSE_HEADERS:
26
        clean_headers.popall(header, None)
27
    try:
28
        auth_header = request.pop("auth_payload")
29
    except KeyError:
30
        pass
31
    else:
32
        clean_headers.add(*auth_header)
33
    return CIMultiDictProxy(clean_headers)
34
35
36
async def _instance_document() -> Optional[str]:
37
    """This is a wrapper around |aiohttp.request|_ to make it usable in a synchronous way.
38
39
    As only one request is done per proxy, there normally is no need to use a session.
40
    There is however a bug (`#3628`_) in ``aiohttp`` that leaks the session when an exception is raised.
41
    The manual session handling for only one request is a workaround while waiting for `PR #3640`_ to be merged.
42
43
    :return: The region name as a string
44
45
    .. |aiohttp.request| replace:: ``aiohttp.request``
46
    .. _aiohttp.request: https://docs.aiohttp.org/en/latest/client_reference.html#aiohttp.request
47
    .. _#3628: https://github.com/aio-libs/aiohttp/issues/3628
48
    .. _PR #3640: https://github.com/aio-libs/aiohttp/pull/3640
49
    """
50
    async with aiohttp.ClientSession(raise_for_status=True, timeout=aiohttp.ClientTimeout(total=1)) as session:
51
        try:
52
            async with session.get("http://169.254.169.254/latest/dynamic/instance-identity/document") as response:
53
                document = await response.text()
54
        except TimeoutError:
55
            logger.debug("Timeout while attempting to get instance document.")
56
        else:
57
            return json.loads(document)["region"]
58
59
60
def _aws_region() -> Optional[str]:
61
    """Attempts to query the AWS region where this instance is running.
62
63
    Returns None if endpoint is not available, which means we're probably not running on AWS.
64
65
    `Related Amazon docs <https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instance-identity-documents.html>`_
66
    """
67
68
    try:
69
        event_loop = asyncio.get_event_loop()
70
    except RuntimeError:
71
        event_loop = asyncio.new_event_loop()
72
        asyncio.set_event_loop(event_loop)
73
74
    return event_loop.run_until_complete(_instance_document())
75