Passed
Pull Request — master (#226)
by Steve
03:05
created

lagom.experimental.context_based   A

Complexity

Total Complexity 42

Size/Duplication

Total Lines 241
Duplicated Lines 17.43 %

Test Coverage

Coverage 92.37%

Importance

Changes 0
Metric Value
wmc 42
eloc 168
dl 42
loc 241
ccs 109
cts 118
cp 0.9237
rs 9.0399
c 0
b 0
f 0

17 Methods

Rating   Name   Duplication   Size   Complexity  
A AwaitableSingleton.get() 0 6 4
A AwaitableSingleton.__init__() 0 5 1
A _AsyncContextBoundFunction.__init__() 7 7 1
A AsyncContextContainer.__init__() 0 10 1
B AsyncContextContainer._context_type_def() 0 16 7
A AsyncContextContainer.__aexit__() 0 4 2
A _AsyncContextBoundFunction.__getattr__() 7 7 4
A AsyncContextContainer.magic_partial() 0 17 2
A AsyncContextContainer.partial() 0 15 2
A AsyncContextContainer._context_resolver() 0 9 1
A AsyncContextContainer._singleton_type_def() 0 8 2
A AsyncContextContainer.clone() 0 9 1
A _AsyncContextBoundFunction.__call__() 2 2 1
C AsyncContextContainer.__aenter__() 0 23 9
A _AsyncContextBoundFunction.__async_call__() 3 3 2
A AsyncContextContainer._async_context_resolver() 0 11 1
A _AsyncContextBoundFunction.rebind() 5 5 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like lagom.experimental.context_based often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
import inspect
2 1
import logging
3 1
from asyncio import Lock
4 1
from contextlib import AsyncExitStack
5 1
from copy import copy
6 1
from functools import wraps
7 1
from typing import (
8
    Optional,
9
    Type,
10
    TypeVar,
11
    Awaitable,
12
    Generic,
13
    Collection,
14
    Union,
15
    ContextManager,
16
    AsyncContextManager,
17
    Iterator,
18
    Generator,
19
    AsyncGenerator,
20
    Callable,
21
    List,
22
)
23
24 1
from lagom.container import Container
25 1
from lagom.definitions import Alias, ConstructionWithContainer, SingletonWrapper
26 1
from lagom.exceptions import InvalidDependencyDefinition, MissingFeature
27 1
from lagom.experimental.definitions import AsyncConstructionWithContainer
28 1
from lagom.interfaces import (
29
    ReadableContainer,
30
    SpecialDepDefinition,
31
    CallTimeContainerUpdate,
32
    ContainerBoundFunction,
33
)
34
35 1
T = TypeVar("T")
36 1
X = TypeVar("X")
37
38
39 1
class AwaitableSingleton(Generic[T]):
40 1
    instance: Optional[T]
41 1
    constructor: ConstructionWithContainer[Awaitable[T]]
42 1
    container: Container
43 1
    _lock: Lock
44
45 1
    def __init__(self, constructor: ConstructionWithContainer, container: Container):
46 1
        self.instance = None
47 1
        self.constructor = constructor  # type: ignore
48 1
        self.container = container
49 1
        self._lock = Lock()
50
51 1
    async def get(self) -> T:
52 1
        if not self.instance:
53 1
            async with self._lock:
54 1
                if not self.instance:
55 1
                    self.instance = await self.constructor.get_instance(self.container)
56 1
        return self.instance
57
58
59 1 View Code Duplication
class _AsyncContextBoundFunction(ContainerBoundFunction[X]):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
60
    """
61
    Represents an instance of a function bound to an async context container
62
    """
63
64 1
    __slots__ = ("async_context_container", "partially_bound_function")
65
66 1
    async_context_container: "AsyncContextContainer"
67 1
    partially_bound_function: ContainerBoundFunction
68
69 1
    __dict__ = dict()
70
71 1
    def __init__(
72
        self,
73
        async_context_container: "AsyncContextContainer",
74
        partially_bound_function: ContainerBoundFunction,
75
    ):
76 1
        self.async_context_container = async_context_container
77 1
        self.partially_bound_function = partially_bound_function
78
79 1
    def __call__(self, *args, **kwargs) -> X:
80 1
        return self.__async_call__(*args, **kwargs)
81
82 1
    async def __async_call__(self, *args, **kwargs):
83 1
        async with self.async_context_container as c:
84 1
            return await self.partially_bound_function.rebind(c)(*args, **kwargs)
85
86 1
    def rebind(self, container: ReadableContainer) -> "ContainerBoundFunction[X]":
87
        return wraps(self.partially_bound_function)(
88
            _AsyncContextBoundFunction(
89
                self.async_context_container,
90
                self.partially_bound_function.rebind(container),
91
            )
92
        )
93
94 1
    def __getattr__(self, item):
95
        if item not in self.__slots__:
96
            raise Exception(f"{item} doesn't exist")
97
        if item == "async_context_container":
98
            return self.async_context_container
99
        if item == "partially_bound_function":
100
            return self.partially_bound_function
101
102
103 1
class AsyncContextContainer(Container):
104 1
    async_exit_stack: Optional[AsyncExitStack] = None
105 1
    _context_types: Collection[Type]
106 1
    _context_singletons: Collection[Type]
107 1
    _root_context: bool = True
108
109 1
    def __init__(
110
        self,
111
        container: Container,
112
        context_types: Collection[Type],
113
        context_singletons: Collection[Type] = tuple(),
114
        log_undefined_deps: Union[bool, logging.Logger] = False,
115
    ):
116 1
        super().__init__(container, log_undefined_deps)
117 1
        self._context_types = set(context_types)
118 1
        self._context_singletons = set(context_singletons)
119
120 1
    def clone(self) -> "AsyncContextContainer":
121
        """returns a copy of the container
122
        :return:
123
        """
124 1
        return AsyncContextContainer(
125
            self,
126
            context_types=self._context_types,
127
            context_singletons=self._context_singletons,
128
            log_undefined_deps=self._undefined_logger,
129
        )
130
131 1
    async def __aenter__(self):
132 1
        if not self.async_exit_stack and self._root_context:
133 1
            self.async_exit_stack = AsyncExitStack()
134
135 1
        if self.async_exit_stack and self._root_context:
136
            # All actual context definitions happen on a clone so that there's isolation between invocations
137 1
            in_context = self.clone()
138 1
            in_context.async_exit_stack = AsyncExitStack()
139 1
            in_context._root_context = False
140
141 1
            for dep_type in self._context_types:
142 1
                managed_dep = self._context_type_def(dep_type)
143 1
                key = Awaitable[dep_type] if isinstance(managed_dep, AsyncConstructionWithContainer) else dep_type  # type: ignore
144 1
                in_context[key] = managed_dep  # type: ignore
145 1
            for dep_type in self._context_singletons:
146 1
                managed_singleton = self._singleton_type_def(dep_type)
147 1
                key = AwaitableSingleton[dep_type] if isinstance(managed_singleton, AwaitableSingleton) else dep_type  # type: ignore
148 1
                in_context[key] = managed_singleton  # type: ignore
149
150
            # The parent context manager keeps track of the inner clone
151 1
            await self.async_exit_stack.enter_async_context(in_context)
152 1
            return in_context
153 1
        return self
154
155 1
    async def __aexit__(self, exc_type, exc_val, exc_tb):
156 1
        if self.async_exit_stack:
157 1
            await self.async_exit_stack.aclose()
158 1
            self.async_exit_stack = None
159
160 1
    def partial(
161
        self,
162
        func: Callable[..., X],
163
        shared: Optional[List[Type]] = None,
164
        container_updater: Optional[CallTimeContainerUpdate] = None,
165
    ) -> ContainerBoundFunction[X]:
166 1
        if not inspect.iscoroutinefunction(func):
167
            raise MissingFeature(
168
                "AsyncContextManager currently can only deal with async functions"
169
            )
170 1
        base_partial = super(AsyncContextContainer, self).partial(
171
            func, shared, container_updater
172
        )
173
174 1
        return wraps(base_partial)(_AsyncContextBoundFunction(self, base_partial))
175
176 1
    def magic_partial(
177
        self,
178
        func: Callable[..., X],
179
        shared: Optional[List[Type]] = None,
180
        keys_to_skip: Optional[List[str]] = None,
181
        skip_pos_up_to: int = 0,
182
        container_updater: Optional[CallTimeContainerUpdate] = None,
183
    ) -> ContainerBoundFunction[X]:
184 1
        if not inspect.iscoroutinefunction(func):
185
            raise MissingFeature(
186
                "AsyncContextManager currently can only deal with async functions"
187
            )
188 1
        base_partial = super(AsyncContextContainer, self).magic_partial(
189
            func, shared, keys_to_skip, skip_pos_up_to, container_updater
190
        )
191
192 1
        return wraps(base_partial)(_AsyncContextBoundFunction(self, base_partial))
193
194 1
    def _context_type_def(self, dep_type: Type):
195 1
        type_def = self.get_definition(ContextManager[dep_type]) or self.get_definition(Iterator[dep_type]) or self.get_definition(Generator[dep_type, None, None]) or self.get_definition(AsyncGenerator[dep_type, None]) or self.get_definition(AsyncContextManager[dep_type])  # type: ignore
196 1
        if type_def is None:
197 1
            raise InvalidDependencyDefinition(
198
                f"A ContextManager[{dep_type}] should be defined. "
199
                f"This could be an Iterator[{dep_type}] or Generator[{dep_type}, None, None] "
200
                f"with the @contextmanager decorator"
201
            )
202 1
        if isinstance(type_def, Alias):
203
            # Without this we create a definition that points to
204
            # itself.
205 1
            type_def = copy(type_def)
206 1
            type_def.skip_definitions = True
207 1
        if self.get_definition(AsyncGenerator[dep_type, None]) or self.get_definition(AsyncContextManager[dep_type]):  # type: ignore
208 1
            return AsyncConstructionWithContainer(lambda c: self._async_context_resolver(c, type_def))  # type: ignore
209 1
        return ConstructionWithContainer(lambda c: self._context_resolver(c, type_def))  # type: ignore
210
211 1
    def _context_resolver(self, c: ReadableContainer, type_def: SpecialDepDefinition):
212
        """
213
        Takes an existing definition which must be a context manager. Returns
214
        the value of the context manager from __enter__ and then places the
215
        __exit__ in this container's exit stack
216
        """
217 1
        assert self.async_exit_stack, "Types can only be resolved within an async with"
218 1
        context_manager = type_def.get_instance(c)
219 1
        return self.async_exit_stack.enter_context(context_manager)
220
221 1
    def _async_context_resolver(
222
        self, c: ReadableContainer, type_def: SpecialDepDefinition
223
    ):
224
        """
225
        Takes an existing definition which must be a context manager. Returns
226
        the value of the context manager from __aenter__ and then places the
227
        __aexit__ in this container's exit stack
228
        """
229 1
        assert self.async_exit_stack, "Types can only be resolved within an async with"
230 1
        context_manager = type_def.get_instance(c)
231 1
        return self.async_exit_stack.enter_async_context(context_manager)
232
233 1
    def _singleton_type_def(self, dep_type: Type):
234
        """
235
        The same as context_type_def but acts as a singleton within this container
236
        """
237 1
        type_def = self._context_type_def(dep_type)
238 1
        if isinstance(type_def, AsyncConstructionWithContainer):
239 1
            return AwaitableSingleton(type_def, self)
240
        return SingletonWrapper(type_def)
241