Test Failed
Pull Request — master (#65)
by
unknown
06:49
created

build.graph.filters   A

Complexity

Total Complexity 18

Size/Duplication

Total Lines 110
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 80
dl 0
loc 110
rs 10
c 0
b 0
f 0
wmc 18

7 Methods

Rating   Name   Duplication   Size   Complexity  
A UseDefaultIfNone.__call__() 0 5 2
A UseValIfNone.__call__() 0 5 2
A ProcessEdgeAttribute.__call__() 0 8 3
A EdgeFilter.__init__() 0 14 2
A TypeDifferentiatedProcessor.__call__() 0 10 4
A TypeCheckPreprocessor.__call__() 0 7 2
A EdgeFilter.__call__() 0 19 3
1
"""Filters for usage in pathfinding."""
2
3
from dataclasses import dataclass
4
from typing import Any, Callable, Iterable, Union
5
6
EdgeData = tuple[str, str, dict]
7
EdgeList = Iterable[EdgeData]
8
9
EdgeDataExtractor = Callable[[EdgeData, Any], Any]
10
11
@dataclass
12
class UseValIfNone:
13
    processor: Callable[[EdgeData], Any]
14
15
    def __call__(self, edge: EdgeData, val: Any):
16
        result = self.processor(edge)
17
        if result is None:
18
            return val
19
        return result
20
    
21
@dataclass
22
class UseDefaultIfNone:
23
    processor: Callable[[EdgeData], Any]
24
    default: Any
25
26
    def __call__(self, edge: EdgeData, _):
27
        result = self.processor(edge)
28
        if result is None:
29
            return self.default
30
        return result
31
32
@dataclass
33
class ProcessEdgeAttribute:
34
    attribute: str
35
    processor: Callable[[Any], Any] = None
36
37
    def __call__(self, edge: EdgeData):
38
        result = edge[2].get(self.attribute)
39
        try:
40
            if self.processor:
41
                return self.processor(result)
42
        except TypeError as err:
43
            raise TypeError(f'({edge[0]}, {edge[1]}) has invalid metadata "{self.attribute}" with value {result!r}: {err}') from err
44
        return result
45
46
class EdgeFilter:
47
48
    def __init__(
49
        self,
50
        comparator: Callable[[Any, Any], bool],
51
        get_func: Union[EdgeDataExtractor, str],
52
        preprocessor: Callable[[Any], Any] = None
53
    ):
54
        self.comparator = comparator
55
        
56
        if isinstance(get_func, str):
57
            get_func = UseValIfNone(ProcessEdgeAttribute(get_func))
58
59
        self.get_func = get_func
60
61
        self.preprocessor = preprocessor
62
63
    def __call__(
64
        self,
65
        value: Any,
66
        items: EdgeList
67
    ) -> EdgeList:
68
        """Apply the filter given the items and value."""
69
70
        # Preprocess the value
71
        if self.preprocessor:
72
            value = self.preprocessor(value)
73
        
74
        # Run filter
75
        return filter(
76
            lambda edge:
77
                self.comparator(
78
                    self.get_func(edge, value),
79
                    value
80
            ),
81
            items
82
        )
83
84
@dataclass
85
class TypeCheckPreprocessor:
86
    types: Union[type, tuple[type]]
87
88
    def __call__(
89
        self,
90
        value: Any
91
    ):
92
        if not isinstance(value, self.types):
93
            raise TypeError(f"Expected type: {self.types}")
94
        return value
95
96
@dataclass
97
class TypeDifferentiatedProcessor:
98
    preprocessors: dict[Union[type, tuple[type]], Callable[[Any], Any]] = None
99
100
    def __call__(
101
        self,
102
        value: Any
103
    ):
104
        for expected_types, processor in self.preprocessors.items():
105
            if isinstance(value, expected_types):
106
                if processor:
107
                    return processor(value)
108
                return value
109
        raise TypeError(f"Expected types: {list(self.preprocessors.keys())}")
110