Passed
Pull Request — master (#213)
by Steve
02:50
created

lagom.container.Container.resolve()   A

Complexity

Conditions 4

Size

Total Lines 41
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 4

Importance

Changes 0
Metric Value
cc 4
eloc 10
nop 4
dl 0
loc 41
rs 9.9
c 0
b 0
f 0
ccs 9
cts 9
cp 1
crap 4
1 1
import functools
2 1
import io
3 1
import logging
4 1
import typing
5 1
from abc import ABCMeta
6
7 1
from types import FunctionType, MethodType
8 1
from typing import (
9
    Dict,
10
    Type,
11
    Any,
12
    TypeVar,
13
    Callable,
14
    Set,
15
    List,
16
    Optional,
17
    cast,
18
    Union,
19
)
20
21 1
from .definitions import (
22
    normalise,
23
    Singleton,
24
    Alias,
25
    ConstructionWithoutContainer,
26
    UnresolvableTypeDefinition,
27
)
28 1
from .exceptions import (
29
    UnresolvableType,
30
    DuplicateDefinition,
31
    InvalidDependencyDefinition,
32
    RecursiveDefinitionError,
33
    DependencyNotDefined,
34
    TypeOnlyAvailableAsAwaitable,
35
)
36 1
from .interfaces import (
37
    SpecialDepDefinition,
38
    WriteableContainer,
39
    TypeResolver,
40
    DefinitionsSource,
41
    ExtendableContainer,
42
    ContainerDebugInfo,
43
    CallTimeContainerUpdate,
44
)
45 1
from .markers import injectable
46 1
from .updaters import update_container_singletons
47 1
from .util.logging import NullLogger
48 1
from .util.reflection import (
49
    FunctionSpec,
50
    CachingReflector,
51
    remove_optional_type,
52
    remove_awaitable_type,
53
)
54 1
from .wrapping import apply_argument_updater
55
56 1
UNRESOLVABLE_TYPES = [
57
    str,
58
    int,
59
    float,
60
    bool,
61
    bytes,
62
    bytearray,
63
    io.BytesIO,
64
    io.BufferedIOBase,
65
    io.BufferedRandom,
66
    io.BufferedReader,
67
    io.BufferedRWPair,
68
    io.BufferedWriter,
69
    io.FileIO,
70
    io.IOBase,
71
    io.RawIOBase,
72
    io.TextIOBase,
73
    typing.IO,
74
    typing.TextIO,
75
    typing.BinaryIO,
76
]
77
78 1
X = TypeVar("X")
79
80 1
Unset: Any = object()
81
82
83 1
class Container(
84
    WriteableContainer, ExtendableContainer, DefinitionsSource, ContainerDebugInfo
85
):
86
    """Dependency injection container
87
88
    Lagom is a dependency injection container designed to give you "just enough"
89
    help with building your dependencies. The intention is that almost
90
    all of your code doesn't know about or rely on lagom. Lagom will
91
    only be involved at the top level to pull everything together.
92
93
    >>> from tests.examples import SomeClass
94
    >>> c = Container()
95
    >>> c[SomeClass]
96
    <tests.examples.SomeClass object at ...>
97
98
    Objects are constructed as they are needed
99
100
    >>> from tests.examples import SomeClass
101
    >>> c = Container()
102
    >>> first = c[SomeClass]
103
    >>> second = c[SomeClass]
104
    >>> first != second
105
    True
106
107
    And construction logic can be defined
108
    >>> from tests.examples import SomeClass, SomeExtendedClass
109
    >>> c = Container()
110
    >>> c[SomeClass] = SomeExtendedClass
111
    >>> c[SomeClass]
112
    <tests.examples.SomeExtendedClass object at ...>
113
    """
114
115 1
    _registered_types: Dict[Type, SpecialDepDefinition]
116 1
    _parent_definitions: DefinitionsSource
117 1
    _reflector: CachingReflector
118 1
    _undefined_logger: logging.Logger
119
120 1
    def __init_subclass__(cls, **kwargs):
121 1
        super().__init_subclass__(**kwargs)
122 1
        if not hasattr(cls, "_lagom_class") and not isinstance(cls, ABCMeta):
123
            raise TypeError(
124
                f"Container can not be subclassed - on some platforms this is a compiled class "
125
                f"and it would be impossible: {type(cls)} with {dir(cls)}"
126
            )
127
128 1
    def __init__(
129
        self,
130
        container: Optional["Container"] = None,
131
        log_undefined_deps: Union[bool, logging.Logger] = False,
132
    ):
133
        """
134
        :param container: Optional container if provided the existing definitions will be copied
135
        :param log_undefined_deps indicates if a log message should be emmited when an undefined dep is loaded
136
        """
137
138
        # ContainerDebugInfo is always registered
139
        # This means consumers can consume an overview of the container
140
        # without hacking anything custom together.
141 1
        self._registered_types = {
142
            ContainerDebugInfo: ConstructionWithoutContainer(lambda: self)
143
        }
144
145 1
        if container:
146 1
            self._parent_definitions = container
147 1
            self._reflector = container._reflector
148
        else:
149 1
            self._parent_definitions = EmptyDefinitionSet()
150 1
            self._reflector = CachingReflector()
151
152 1
        if not log_undefined_deps:
153 1
            self._undefined_logger = NullLogger()
154 1
        elif log_undefined_deps is True:
155 1
            self._undefined_logger = logging.getLogger(__name__)
156
        else:
157 1
            self._undefined_logger = cast(logging.Logger, log_undefined_deps)
158
159 1
    def define(self, dep: Type[X], resolver: TypeResolver[X]) -> SpecialDepDefinition:
160
        """Register how to construct an object of type X
161
162
        >>> from tests.examples import SomeClass
163
        >>> c = Container()
164
        >>> c.define(SomeClass, lambda: SomeClass())
165
        <lagom.definitions.ConstructionWithoutContainer ...>
166
167
        :param dep: The type to be constructed
168
        :param resolver: A definition of how to construct it
169
        :return:
170
        """
171 1
        if dep in UNRESOLVABLE_TYPES:
172
            raise InvalidDependencyDefinition()
173 1
        if dep in self._registered_types:
174 1
            raise DuplicateDefinition()
175 1
        if dep is resolver:
176
            # This is a special case for things like container[Foo] = Foo
177 1
            return self.define(dep, Alias(dep, skip_definitions=True))
178 1
        definition = normalise(resolver)
179 1
        self._registered_types[dep] = definition
180 1
        self._registered_types[Optional[dep]] = definition  # type: ignore
181
182
        # For awaitables we add a convenience exception to be thrown if code hints on the type
183
        # without the awaitable.
184 1
        awaitable_type = remove_awaitable_type(dep)
185 1
        if awaitable_type:
186
            # Unless there's already a sync version defined.
187 1
            if awaitable_type not in self.defined_types:
188 1
                self._registered_types[awaitable_type] = UnresolvableTypeDefinition(
189
                    TypeOnlyAvailableAsAwaitable(awaitable_type)
190
                )
191 1
        return definition
192
193 1
    @property
194 1
    def defined_types(self) -> Set[Type]:
195
        """The types the container has explicit build instructions for
196
197
        :return:
198
        """
199 1
        return self._parent_definitions.defined_types.union(
200
            self._registered_types.keys()
201
        )
202
203 1
    @property
204 1
    def reflection_cache_overview(self) -> Dict[str, str]:
205 1
        return self._reflector.overview_of_cache
206
207 1
    def temporary_singletons(
208
        self, singletons: List[Type] = None
209
    ) -> "_TemporaryInjectionContext":
210
        """
211
        Returns a context that loads a new container with singletons that only exist
212
        for the context.
213
214
        >>> from tests.examples import SomeClass
215
        >>> base_container = Container()
216
        >>> def my_func():
217
        ...     with base_container.temporary_singletons([SomeClass]) as c:
218
        ...         assert c[SomeClass] is c[SomeClass]
219
        >>> my_func()
220
221
        :param singletons: items which should be considered singletons within the context
222
        :return:
223
        """
224 1
        updater = (
225
            functools.partial(update_container_singletons, singletons=singletons)
226
            if singletons
227
            else None
228
        )
229 1
        return _TemporaryInjectionContext(self, updater)
230
231 1
    def resolve(
232
        self, dep_type: Type[X], suppress_error=False, skip_definitions=False
233
    ) -> X:
234
        """Constructs an object of type X
235
236
         If the object can't be constructed an exception will be raised unless
237
         supress errors is true
238
239
        >>> from tests.examples import SomeClass
240
        >>> c = Container()
241
        >>> c.resolve(SomeClass)
242
        <tests.examples.SomeClass object at ...>
243
244
        >>> from tests.examples import SomeClass
245
        >>> c = Container()
246
        >>> c.resolve(int)
247
        Traceback (most recent call last):
248
        ...
249
        lagom.exceptions.UnresolvableType: ...
250
251
        Optional wrappers are stripped out to be what is being asked for
252
        >>> from tests.examples import SomeClass
253
        >>> c = Container()
254
        >>> c.resolve(Optional[SomeClass])
255
        <tests.examples.SomeClass object at ...>
256
257
        :param dep_type: The type of object to construct
258
        :param suppress_error: if true returns None on failure
259
        :param skip_definitions:
260
        :return:
261
        """
262 1
        if not skip_definitions:
263 1
            definition = self.get_definition(dep_type)
264 1
            if definition:
265 1
                return definition.get_instance(self)
266
267 1
        optional_dep_type = remove_optional_type(dep_type)
268 1
        if optional_dep_type:
269 1
            return self.resolve(optional_dep_type, suppress_error=True)
270
271 1
        return self._reflection_build_with_err_handling(dep_type, suppress_error)
272
273 1
    def partial(
274
        self,
275
        func: Callable[..., X],
276
        shared: List[Type] = None,
277
        container_updater: Optional[CallTimeContainerUpdate] = None,
278
    ) -> Callable[..., X]:
279
        """Takes a callable and returns a callable bound to the container
280
        When invoking the new callable if any arguments have a default set
281
        to the special marker object "injectable" then they will be constructed by
282
        the container. For automatic injection without the marker use "magic_partial"
283
        >>> from tests.examples import SomeClass
284
        >>> c = Container()
285
        >>> def my_func(something: SomeClass = injectable):
286
        ...     return f"Successfully called with {something}"
287
        >>> bound_func = c.magic_partial(my_func)
288
        >>> bound_func()
289
        'Successfully called with <tests.examples.SomeClass object at ...>'
290
291
        :param func: the function to bind to the container
292
        :param shared: items which should be considered singletons on a per call level
293
        :param container_updater: An optional callable to update the container before resolution
294
        :return:
295
        """
296 1
        spec = self._get_spec_without_self(func)
297 1
        keys_to_bind = (
298
            key for (key, arg) in spec.defaults.items() if arg is injectable
299
        )
300 1
        keys_and_types = [(key, spec.annotations[key]) for key in keys_to_bind]
301
302 1
        _injection_context = self.temporary_singletons(shared)
303 1
        update_container = container_updater if container_updater else _update_nothing
304
305 1
        def _update_args(supplied_args, supplied_kwargs):
306 1
            keys_to_skip = set(supplied_kwargs.keys())
307 1
            keys_to_skip.update(spec.args[0 : len(supplied_args)])
308 1
            with _injection_context as invocation_container:
309 1
                update_container(invocation_container, supplied_args, supplied_kwargs)
310 1
                kwargs = {
311
                    key: invocation_container.resolve(dep_type)
312
                    for (key, dep_type) in keys_and_types
313
                    if key not in keys_to_skip
314
                }
315 1
            kwargs.update(supplied_kwargs)
316 1
            return supplied_args, kwargs
317
318 1
        return apply_argument_updater(func, _update_args, spec)
319
320 1
    def magic_partial(
321
        self,
322
        func: Callable[..., X],
323
        shared: List[Type] = None,
324
        keys_to_skip: List[str] = None,
325
        skip_pos_up_to: int = 0,
326
        container_updater: Optional[CallTimeContainerUpdate] = None,
327
    ) -> Callable[..., X]:
328
        """Takes a callable and returns a callable bound to the container
329
        When invoking the new callable if any arguments can be constructed by the container
330
        then they can be ommited.
331
        >>> from tests.examples import SomeClass
332
        >>> c = Container()
333
        >>> def my_func(something: SomeClass):
334
        ...   return f"Successfully called with {something}"
335
        >>> bound_func = c.magic_partial(my_func)
336
        >>> bound_func()
337
        'Successfully called with <tests.examples.SomeClass object at ...>'
338
339
        :param func: the function to bind to the container
340
        :param shared: items which should be considered singletons on a per call level
341
        :param keys_to_skip: named arguments which the container shouldnt build
342
        :param skip_pos_up_to: positional arguments which the container shouldnt build
343
        :param container_updater: An optional callable to update the container before resolution
344
        :return:
345
        """
346 1
        spec = self._get_spec_without_self(func)
347
348 1
        update_container = container_updater if container_updater else _update_nothing
349 1
        _injection_context = self.temporary_singletons(shared)
350
351 1
        def _update_args(supplied_args, supplied_kwargs):
352 1
            final_keys_to_skip = (keys_to_skip or []) + list(supplied_kwargs.keys())
353 1
            final_skip_pos_up_to = max(skip_pos_up_to, len(supplied_args))
354 1
            with _injection_context as invocation_container:
355 1
                update_container(invocation_container, supplied_args, supplied_kwargs)
356 1
                kwargs = invocation_container._infer_dependencies(
357
                    spec,
358
                    suppress_error=True,
359
                    keys_to_skip=final_keys_to_skip,
360
                    skip_pos_up_to=final_skip_pos_up_to,
361
                )
362 1
            kwargs.update(supplied_kwargs)
363 1
            return supplied_args, kwargs
364
365 1
        return apply_argument_updater(func, _update_args, spec, catch_errors=True)
366
367 1
    def clone(self) -> "Container":
368
        """returns a copy of the container
369
        :return:
370
        """
371 1
        return Container(self, log_undefined_deps=self._undefined_logger)
372
373 1
    def get_definition(self, dep_type: Type[X]) -> Optional[SpecialDepDefinition[X]]:
374
        """
375
        Will return the definition in this container. If none has been defined any
376
        definition in the parent container will be used.
377
378
        :param dep_type:
379
        :return:
380
        """
381 1
        definition = self._registered_types.get(dep_type, Unset)
382 1
        if definition is Unset:
383 1
            return self._parent_definitions.get_definition(dep_type)
384 1
        return definition
385
386 1
    def __getitem__(self, dep: Type[X]) -> X:
387 1
        return self.resolve(dep)
388
389 1
    def __setitem__(self, dep: Type[X], resolver: TypeResolver[X]):
390 1
        self.define(dep, resolver)
391
392 1
    def _reflection_build_with_err_handling(
393
        self, dep_type: Type[X], suppress_error: bool
394
    ) -> X:
395 1
        try:
396 1
            if dep_type in UNRESOLVABLE_TYPES:
397 1
                raise UnresolvableType(dep_type)
398 1
            return self._reflection_build(dep_type)
399 1
        except UnresolvableType as inner_error:
400 1
            if not suppress_error:
401 1
                raise UnresolvableType(dep_type) from inner_error
402 1
            return None  # type: ignore
403 1
        except RecursionError as recursion_error:
404
            raise RecursiveDefinitionError(dep_type) from recursion_error
405
406 1
    def _reflection_build(self, dep_type: Type[X]) -> X:
407 1
        self._undefined_logger.warning(
408
            f"Undefined dependency. Using reflection for {dep_type}",
409
            extra={"undefined_dependency": dep_type},
410
        )
411 1
        spec = self._reflector.get_function_spec(dep_type.__init__)
412 1
        sub_deps = self._infer_dependencies(spec, types_to_skip={dep_type})
413 1
        try:
414 1
            return dep_type(**sub_deps)  # type: ignore
415 1
        except TypeError as type_error:
416 1
            raise UnresolvableType(dep_type) from type_error
417
418 1
    def _infer_dependencies(
419
        self,
420
        spec: FunctionSpec,
421
        suppress_error=False,
422
        keys_to_skip: List[str] = None,
423
        skip_pos_up_to=0,
424
        types_to_skip: Set[Type] = None,
425
    ):
426 1
        dep_keys_to_skip: List[str] = []
427 1
        dep_keys_to_skip.extend(spec.args[0:skip_pos_up_to])
428 1
        dep_keys_to_skip.extend(keys_to_skip or [])
429 1
        types_to_skip = types_to_skip or set()
430 1
        sub_deps = {
431
            key: self.resolve(sub_dep_type, suppress_error=suppress_error)
432
            for (key, sub_dep_type) in spec.annotations.items()
433
            if sub_dep_type != Any
434
            and (key not in dep_keys_to_skip)
435
            and (sub_dep_type not in types_to_skip)
436
        }
437 1
        return {key: dep for (key, dep) in sub_deps.items() if dep is not None}
438
439 1
    def _get_spec_without_self(self, func: Callable[..., X]) -> FunctionSpec:
440 1
        if isinstance(func, (FunctionType, MethodType)):
441 1
            return self._reflector.get_function_spec(func)
442 1
        t = cast(Type[X], func)
443 1
        return self._reflector.get_function_spec(t.__init__).without_argument("self")
444
445
446 1
class ExplicitContainer(Container):
447 1
    _lagom_class: typing.ClassVar[bool] = True
448
449 1
    def resolve(
450
        self, dep_type: Type[X], suppress_error=False, skip_definitions=False
451
    ) -> X:
452 1
        definition = self.get_definition(dep_type)
453 1
        if not definition:
454 1
            if suppress_error:
455 1
                return None  # type: ignore
456 1
            raise DependencyNotDefined(dep_type)
457 1
        return definition.get_instance(self)
458
459 1
    def define(self, dep, resolver):
460 1
        definition = super().define(dep, resolver)
461 1
        if isinstance(definition, Alias):
462 1
            raise InvalidDependencyDefinition(
463
                "Aliases are not valid in an explicit container"
464
            )
465 1
        if isinstance(definition, Singleton) and isinstance(
466
            definition.singleton_type, Alias
467
        ):
468 1
            raise InvalidDependencyDefinition(
469
                "Aliases are not valid inside singletons in an explicit container"
470
            )
471 1
        return definition
472
473 1
    def clone(self):
474
        """returns a copy of the container
475
        :return:
476
        """
477 1
        return ExplicitContainer(self, log_undefined_deps=self._undefined_logger)
478
479
480 1
class EmptyDefinitionSet(DefinitionsSource):
481
    """
482
    Represents the starting state for a collection of dependency definitions
483
    i.e. None and everything has to be built with reflection
484
    """
485
486 1
    def get_definition(self, dep_type: Type[X]) -> Optional[SpecialDepDefinition[X]]:
487
        """
488
        No types are defined in the empty set
489
        :param dep_type:
490
        :return:
491
        """
492 1
        return None
493
494 1
    @property
495 1
    def defined_types(self) -> Set[Type]:
496 1
        return set()
497
498
499 1
class _TemporaryInjectionContext:
500 1
    _base_container: Container
501
502 1
    def __init__(
503
        self,
504
        container: Container,
505
        update_function: Optional[Callable[[Container], Container]] = None,
506
    ):
507 1
        self._base_container = container
508 1
        if update_function:
509 1
            self._build_temporary_container = lambda: update_function(
510
                self._base_container
511
            )
512
        else:
513 1
            self._build_temporary_container = lambda: self._base_container.clone()
514
515 1
    def __enter__(self) -> Container:
516 1
        return self._build_temporary_container()
517
518 1
    def __exit__(self, exc_type, exc_val, exc_tb):
519 1
        pass
520
521
522 1
def _update_nothing(_c: WriteableContainer, _a: typing.Collection, _k: Dict):
523
    return None
524