Total Complexity | 55 |
Total Lines | 281 |
Duplicated Lines | 7.12 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like mandos.model.pubchem_support._nav often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | from __future__ import annotations |
||
|
|||
2 | from dataclasses import dataclass |
||
3 | from typing import ( |
||
4 | List, |
||
5 | Union, |
||
6 | Sequence, |
||
7 | Callable, |
||
8 | Set, |
||
9 | FrozenSet, |
||
10 | Any, |
||
11 | Optional, |
||
12 | Type, |
||
13 | TypeVar, |
||
14 | Iterable, |
||
15 | ) |
||
16 | |||
17 | from pocketutils.core.dot_dict import NestedDotDict |
||
18 | from pocketutils.tools.base_tools import BaseTools |
||
19 | |||
20 | from mandos.model.pubchem_support._nav_model import FilterFn |
||
21 | |||
22 | |||
23 | T = TypeVar("T", covariant=True) |
||
24 | V = TypeVar("V", covariant=True) |
||
25 | |||
26 | catchable_errors = (KeyError, ValueError, LookupError, TypeError) |
||
27 | |||
28 | |||
29 | class NavError(Exception): |
||
30 | pass |
||
31 | |||
32 | |||
33 | class MapError(NavError): |
||
34 | pass |
||
35 | |||
36 | |||
37 | class FlatmapError(NavError): |
||
38 | pass |
||
39 | |||
40 | |||
41 | class FilterError(NavError): |
||
42 | pass |
||
43 | |||
44 | |||
45 | def _identity(x: T) -> T: |
||
46 | return x |
||
47 | |||
48 | |||
49 | # This happens to be duplicated in nav_utils, |
||
50 | # but I *really* don't want that dependency |
||
51 | def _request_only(things: Iterable[str]) -> Optional[str]: |
||
52 | # TODO: Did I mean to excludeNone here? |
||
53 | things = [s.strip() for s in things if s is not None] |
||
54 | if len(things) > 1: |
||
55 | raise ValueError(f"{len(things)} items in {things}") |
||
56 | elif len(things) == 0: |
||
57 | return None |
||
58 | else: |
||
59 | return things[0] |
||
60 | |||
61 | |||
62 | def _get_conversion_fn(fn: Union[None, str, Callable[[Any], Any]]) -> Callable[[Any], Any]: |
||
63 | if fn is None: |
||
64 | return _identity |
||
65 | if isinstance(fn, str): |
||
66 | return _request_only |
||
67 | else: |
||
68 | return fn |
||
69 | |||
70 | |||
71 | @dataclass(frozen=True, eq=True) |
||
72 | class AbstractJsonNavigator: |
||
73 | """""" |
||
74 | |||
75 | |||
76 | @dataclass(frozen=True, eq=True) |
||
77 | class JsonNavigator(AbstractJsonNavigator): |
||
78 | contents: List[NestedDotDict] |
||
79 | |||
80 | @classmethod |
||
81 | def create( |
||
82 | cls, dct: Union[dict, NestedDotDict, Sequence[dict], Sequence[NestedDotDict]] |
||
83 | ) -> JsonNavigator: |
||
84 | if hasattr(dct, "items"): |
||
85 | dct = [dct] |
||
86 | return JsonNavigator([NestedDotDict(dict(**d, _landmark="")) for d in dct]) |
||
87 | |||
88 | @property |
||
89 | def get(self) -> List[NestedDotDict]: |
||
90 | return self.contents |
||
91 | |||
92 | def __truediv__( |
||
93 | self, key: Union[int, str, FilterFn, Callable[[NestedDotDict], NestedDotDict]] |
||
94 | ) -> JsonNavigator: |
||
95 | View Code Duplication | if isinstance(key, FilterFn): |
|
96 | try: |
||
97 | return self._filter(key) |
||
98 | except catchable_errors as e: |
||
99 | raise FilterError(f"Failed to go filter navigator with '{key}': {e}") |
||
100 | else: |
||
101 | try: |
||
102 | return self._go_inside(key) |
||
103 | except catchable_errors as e: |
||
104 | raise MapError(f"Failed to map navigator with '{key}': {e}") |
||
105 | |||
106 | def __mod__(self, key: Union[int, str]) -> JsonNavigator: |
||
107 | new = {} |
||
108 | for z in self.contents: |
||
109 | if z[key] in new: |
||
110 | raise ValueError(f"{key} found twice") |
||
111 | new[z[key]] = z |
||
112 | return JsonNavigator([NestedDotDict(new)]) |
||
113 | |||
114 | def __floordiv__(self, keys: Sequence[str]) -> JsonNavigatorListOfLists: |
||
115 | try: |
||
116 | return JsonNavigatorListOfLists([[z.get(key) for key in keys] for z in self.contents]) |
||
117 | except catchable_errors as e: |
||
118 | raise FlatmapError(f"Failed to flatmap from navigator with '{keys}': {e}") |
||
119 | |||
120 | def __rshift__(self, key: str) -> JsonNavigatorListOfOptionals: |
||
121 | try: |
||
122 | return JsonNavigatorListOfOptionals([z.get(key) for z in self.contents]) |
||
123 | except catchable_errors as e: |
||
124 | raise FlatmapError(f"Failed to 'double-flatmap' from navigator with '{key}': {e}") |
||
125 | |||
126 | def _filter(self, keep_where: FilterFn) -> JsonNavigator: |
||
127 | if callable(keep_where): |
||
128 | return JsonNavigator([z for z in self.contents if keep_where(z)]) |
||
129 | else: |
||
130 | key, values = keep_where |
||
131 | if not isinstance(values, (Set, FrozenSet, List)): |
||
132 | values = {values} |
||
133 | return JsonNavigator([z for z in self.contents if z.get(key) in values]) |
||
134 | |||
135 | def _go_inside(self, key: Union[int, str]) -> JsonNavigator: |
||
136 | new = [] |
||
137 | for z in self.contents: |
||
138 | if key in z: |
||
139 | # nav = z.get_as("_nav", list, []) |
||
140 | # nav.append(z[key]) |
||
141 | if isinstance(z.get(key), list): |
||
142 | new.extend([NestedDotDict(dict(**m)) for m in z[key]]) |
||
143 | elif isinstance(z.get(key), NestedDotDict): |
||
144 | new.append(NestedDotDict(dict(**z[key]))) |
||
145 | elif isinstance(z.get(key), dict): |
||
146 | new.append(NestedDotDict(dict(**z[key]))) |
||
147 | else: |
||
148 | raise ValueError(f"{key} value is {type(z[key])}: {z[key]}") |
||
149 | return JsonNavigator(new) |
||
150 | |||
151 | |||
152 | @dataclass(frozen=True, eq=True) |
||
153 | class JsonNavigatorListOfLists(AbstractJsonNavigator): |
||
154 | contents: List[List[Any]] |
||
155 | |||
156 | def __truediv__( |
||
157 | self, |
||
158 | keys: Union[ |
||
159 | Sequence[Union[None, str, Callable[[Any], Any]]], FilterFn, Callable[[List[T]], Any] |
||
160 | ], |
||
161 | ) -> JsonNavigatorListOfLists: |
||
162 | View Code Duplication | if isinstance(keys, FilterFn): |
|
163 | try: |
||
164 | return self._filter(keys) |
||
165 | except catchable_errors as e: |
||
166 | raise FilterError(f"Failed to filter list-of-lists with '{keys}': {e}") |
||
167 | else: |
||
168 | try: |
||
169 | return self._go_inside(keys) |
||
170 | except catchable_errors as e: |
||
171 | raise MapError(f"Failed to map list-of-lists with '{keys}': {e}") |
||
172 | |||
173 | def __rshift__(self, conversion: Callable[[List[List[T]]], Any]) -> JsonNavigatorSingleOptional: |
||
174 | try: |
||
175 | return JsonNavigatorSingleOptional(conversion(self.contents)) |
||
176 | except catchable_errors as e: |
||
177 | raise FlatmapError( |
||
178 | f"Failed to 'double-flatmap' from list-of-lists with '{conversion}': {e}" |
||
179 | ) |
||
180 | |||
181 | def __floordiv__(self, conversion: Callable[[List[T]], Any]) -> JsonNavigatorListOfOptionals: |
||
182 | try: |
||
183 | return JsonNavigatorListOfOptionals([conversion(z) for z in self.contents]) |
||
184 | except catchable_errors as e: |
||
185 | raise FlatmapError(f"Failed to flatmap from list-of-lists with '{conversion}': {e}") |
||
186 | |||
187 | def _filter(self, keep_if: FilterFn) -> JsonNavigatorListOfLists: |
||
188 | return JsonNavigatorListOfLists([z for z in self.contents if keep_if(z)]) |
||
189 | |||
190 | def _go_inside( |
||
191 | self, keys: Sequence[Union[None, str, Callable[[Any], Any]]] |
||
192 | ) -> JsonNavigatorListOfLists: |
||
193 | fns = [_get_conversion_fn(fn) for fn in keys] |
||
194 | return JsonNavigatorListOfLists( |
||
195 | [ |
||
196 | [fn(value) for value, fn in BaseTools.zip_list(contents, fns)] |
||
197 | for contents in self.contents |
||
198 | ] |
||
199 | ) |
||
200 | |||
201 | |||
202 | @dataclass(frozen=True, eq=True) |
||
203 | class JsonNavigatorListOfOptionals(AbstractJsonNavigator): |
||
204 | contents: List[Optional[T]] |
||
205 | |||
206 | @property |
||
207 | def to_list(self) -> List[T]: |
||
208 | return [k for k in self.contents if k is not None] |
||
209 | |||
210 | @property |
||
211 | def to_set(self) -> FrozenSet[T]: |
||
212 | return frozenset([k for k in self.contents if k is not None]) |
||
213 | |||
214 | def __truediv__( |
||
215 | self, key: Union[None, str, FilterFn, Callable[[Any], Any]] |
||
216 | ) -> JsonNavigatorListOfOptionals: |
||
217 | if isinstance(key, FilterFn): |
||
218 | try: |
||
219 | return self._filter(key) |
||
220 | except catchable_errors as e: |
||
221 | raise FilterError(f"Failed to filter list-of-optionals with '{key}': {e}") |
||
222 | else: |
||
223 | try: |
||
224 | return self._go_inside(key) |
||
225 | except (KeyError, ValueError, LookupError) as e: |
||
226 | raise FlatmapError(f"Failed to map list-of-optionals with '{key}': {e}") |
||
227 | |||
228 | def __rshift__(self, key: Callable[[List[Optional[Any]]], Any]) -> JsonNavigatorSingleOptional: |
||
229 | # we can't skip 2 |
||
230 | return self // key |
||
231 | |||
232 | def __floordiv__( |
||
233 | self, key: Callable[[List[Optional[Any]]], Any] |
||
234 | ) -> JsonNavigatorSingleOptional: |
||
235 | try: |
||
236 | return JsonNavigatorSingleOptional(key(self.contents)) |
||
237 | except catchable_errors as e: |
||
238 | raise FlatmapError(f"Failed to flatmap from list-of-optionals with '{key}': {e}") |
||
239 | |||
240 | def _go_inside( |
||
241 | self, key: Union[None, str, Callable[[Any], Any]] |
||
242 | ) -> JsonNavigatorListOfOptionals: |
||
243 | fn = _get_conversion_fn(key) |
||
244 | return JsonNavigatorListOfOptionals( |
||
245 | [None if content is None else fn(content) for content in self.contents] |
||
246 | ) |
||
247 | |||
248 | def _filter( |
||
249 | self, keep_if: Union[Callable[[Optional[T]], bool]] |
||
250 | ) -> JsonNavigatorListOfOptionals: |
||
251 | return JsonNavigatorListOfOptionals([z for z in self.contents if keep_if(z)]) |
||
252 | |||
253 | |||
254 | @dataclass(frozen=True, eq=True) |
||
255 | class JsonNavigatorSingleOptional(AbstractJsonNavigator): |
||
256 | contents: Optional[T] |
||
257 | |||
258 | @property |
||
259 | def get(self) -> T: |
||
260 | return self.contents |
||
261 | |||
262 | def __floordiv__( |
||
263 | self, conversion: Union[Type[T], Callable[[T], V]] |
||
264 | ) -> JsonNavigatorSingleOptional: |
||
265 | try: |
||
266 | return JsonNavigatorSingleOptional(conversion(self.contents)) |
||
267 | except catchable_errors as e: |
||
268 | raise FlatmapError(f"Failed to map single-optional with '{conversion}': {e}") |
||
269 | |||
270 | |||
271 | __all__ = [ |
||
272 | "JsonNavigator", |
||
273 | "JsonNavigatorListOfLists", |
||
274 | "JsonNavigatorListOfOptionals", |
||
275 | "AbstractJsonNavigator", |
||
276 | "JsonNavigatorSingleOptional", |
||
277 | "NavError", |
||
278 | "MapError", |
||
279 | "FlatmapError", |
||
280 | "FilterError", |
||
281 | ] |
||
282 |