1
|
|
|
"""Filters for usage in pathfinding.""" |
2
|
|
|
|
3
|
1 |
|
from dataclasses import dataclass |
4
|
1 |
|
from typing import Any, Callable, Iterable, Union |
5
|
|
|
|
6
|
1 |
|
EdgeData = tuple[str, str, dict] |
7
|
1 |
|
EdgeList = Iterable[EdgeData] |
8
|
|
|
|
9
|
1 |
|
EdgeDataExtractor = Callable[[EdgeData, Any], Any] |
10
|
|
|
|
11
|
1 |
|
class EdgeFilter: |
12
|
|
|
|
13
|
1 |
|
def __init__( |
14
|
|
|
self, |
15
|
|
|
comparator: Callable[[Any, Any], bool], |
16
|
|
|
get_func: Union[EdgeDataExtractor, str], |
17
|
|
|
preprocessor: Callable[[Any], Any] = None |
18
|
|
|
): |
19
|
1 |
|
self.comparator = comparator |
20
|
|
|
|
21
|
1 |
|
if isinstance(get_func, str): |
22
|
1 |
|
get_str = get_func |
23
|
1 |
|
get_func = lambda edge, val: edge[2].get(get_str, val) |
|
|
|
|
24
|
|
|
|
25
|
1 |
|
self.get_func = get_func |
26
|
|
|
|
27
|
1 |
|
self.preprocessor = preprocessor |
28
|
|
|
|
29
|
1 |
|
def __call__( |
30
|
|
|
self, |
31
|
|
|
value: Any, |
32
|
|
|
items: EdgeList |
33
|
|
|
) -> EdgeList: |
34
|
|
|
"""Apply the filter given the items and value.""" |
35
|
|
|
|
36
|
|
|
# Preprocess the value |
37
|
1 |
|
if self.preprocessor: |
38
|
1 |
|
value = self.preprocessor(value) |
39
|
|
|
|
40
|
|
|
# Run filter |
41
|
1 |
|
return filter( |
42
|
|
|
lambda edge: |
43
|
|
|
self.comparator( |
44
|
|
|
self.get_func(edge, value), |
45
|
|
|
value |
46
|
|
|
), |
47
|
|
|
items |
48
|
|
|
) |
49
|
|
|
|
50
|
1 |
|
@dataclass |
51
|
1 |
|
class TypeCheckPreprocessor: |
52
|
1 |
|
types: Union[type, tuple[type]] |
53
|
1 |
|
preprocessor: Callable[[Any], Any] = None |
54
|
|
|
|
55
|
1 |
|
def __call__( |
56
|
|
|
self, |
57
|
|
|
value: Any |
58
|
|
|
): |
59
|
1 |
|
if not isinstance(value, self.types): |
60
|
1 |
|
raise TypeError(f"Expected type: {self.types}") |
61
|
1 |
|
if self.preprocessor: |
62
|
1 |
|
return self.preprocessor(value) |
63
|
|
|
return value |
64
|
|
|
|