1
|
1 |
|
import functools |
2
|
1 |
|
import inspect |
3
|
1 |
|
from typing import Dict, Type, Union, Any, TypeVar, Callable |
4
|
|
|
|
5
|
1 |
|
from lagom.util.functional import arity |
6
|
1 |
|
from .exceptions import UnresolvableType, InvalidDependencyDefinition |
7
|
1 |
|
from .definitions import Resolver, Construction, Singleton, Alias, DEFINITION_TYPES |
8
|
|
|
|
9
|
1 |
|
UNRESOLVABLE_TYPES = [str, int, float, bool] |
10
|
|
|
|
11
|
1 |
|
X = TypeVar("X") |
12
|
|
|
|
13
|
1 |
|
DepDefinition = Any |
14
|
|
|
|
15
|
|
|
|
16
|
1 |
|
class Container: |
17
|
1 |
|
_registered_types: Dict[Type, Resolver] = {} |
18
|
|
|
|
19
|
1 |
|
def define(self, dep: Union[Type[X], Type], resolver: DepDefinition) -> None: |
20
|
1 |
|
self._registered_types[dep] = self._normalise(resolver) |
21
|
|
|
|
22
|
1 |
|
def resolve(self, dep_type: Type[X], suppress_error=False) -> X: |
23
|
1 |
|
try: |
24
|
1 |
|
if dep_type in UNRESOLVABLE_TYPES: |
25
|
1 |
|
raise UnresolvableType(f"Cannot construct type {dep_type}") |
26
|
1 |
|
registered_type = self._registered_types.get(dep_type, dep_type) |
27
|
1 |
|
if isinstance(registered_type, Singleton): |
28
|
1 |
|
return self._load_singleton(registered_type) |
29
|
1 |
|
return self._build(registered_type) |
30
|
1 |
|
except UnresolvableType as inner_error: |
31
|
1 |
|
if not suppress_error: |
32
|
1 |
|
raise UnresolvableType( |
33
|
|
|
f"Cannot construct type {dep_type.__name__}" |
34
|
|
|
) from inner_error |
35
|
1 |
|
return None # type: ignore |
36
|
|
|
|
37
|
1 |
|
def partial(self, func: Callable) -> Callable: |
38
|
1 |
|
spec = inspect.getfullargspec(func) |
39
|
1 |
|
bindable_deps = self._infer_dependencies(spec, suppress_error=True) |
40
|
1 |
|
return functools.partial(func, **bindable_deps) |
41
|
|
|
|
42
|
1 |
|
def __getitem__(self, dep: Type[X]) -> X: |
43
|
1 |
|
return self.resolve(dep) |
44
|
|
|
|
45
|
1 |
|
def __setitem__(self, dep: Type, resolver: DepDefinition): |
46
|
1 |
|
self.define(dep, resolver) |
47
|
|
|
|
48
|
1 |
|
def _normalise(self, resolver: DepDefinition) -> Resolver: |
49
|
1 |
|
if type(resolver) in DEFINITION_TYPES: |
50
|
1 |
|
return resolver |
51
|
1 |
|
elif inspect.isfunction(resolver): |
52
|
1 |
|
return self._build_lambda_constructor(resolver) |
53
|
1 |
|
elif not inspect.isclass(resolver): |
54
|
1 |
|
return Singleton(lambda: resolver) # type: ignore |
55
|
|
|
else: |
56
|
1 |
|
return Alias(resolver) |
57
|
|
|
|
58
|
1 |
|
def _build_lambda_constructor(self, resolver: Callable) -> Construction: |
59
|
1 |
|
artiy = arity(resolver) |
60
|
1 |
|
if artiy == 0: |
61
|
1 |
|
return Construction(resolver) |
62
|
1 |
|
if artiy == 1: |
63
|
1 |
|
return Construction(functools.partial(resolver, self)) |
64
|
1 |
|
raise InvalidDependencyDefinition(f"Arity {arity} functions are not supported") |
65
|
|
|
|
66
|
1 |
|
def _build(self, dep_type: Any) -> Any: |
67
|
1 |
|
if isinstance(dep_type, Alias): |
68
|
1 |
|
return self.resolve(dep_type.alias_type) |
69
|
1 |
|
if isinstance(dep_type, Construction): |
70
|
1 |
|
return dep_type.construct() |
71
|
1 |
|
return self._reflection_build(dep_type) |
72
|
|
|
|
73
|
1 |
|
def _reflection_build(self, dep_type: Type[X]) -> X: |
74
|
1 |
|
spec = inspect.getfullargspec(dep_type.__init__) |
75
|
1 |
|
sub_deps = self._infer_dependencies(spec) |
76
|
1 |
|
return dep_type(**sub_deps) # type: ignore |
77
|
|
|
|
78
|
1 |
|
def _infer_dependencies(self, spec: inspect.FullArgSpec, suppress_error=False): |
79
|
1 |
|
sub_deps = { |
80
|
|
|
key: self.resolve(sub_dep_type, suppress_error=suppress_error) |
81
|
|
|
for (key, sub_dep_type) in spec.annotations.items() |
82
|
|
|
} |
83
|
1 |
|
filtered_deps = {key: dep for (key, dep) in sub_deps.items() if dep is not None} |
84
|
1 |
|
return filtered_deps |
85
|
|
|
|
86
|
1 |
|
def _load_singleton(self, singleton: Singleton): |
87
|
1 |
|
if singleton.has_instance: |
88
|
1 |
|
return singleton.instance |
89
|
|
|
return singleton.set_instance(self._build(singleton.singleton_type)) |
90
|
|
|
|