| Total Complexity | 55 |
| Total Lines | 281 |
| Duplicated Lines | 7.12 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like mandos.model.pubchem_support._nav often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | from __future__ import annotations |
||
|
|
|||
| 2 | from dataclasses import dataclass |
||
| 3 | from typing import ( |
||
| 4 | List, |
||
| 5 | Union, |
||
| 6 | Sequence, |
||
| 7 | Callable, |
||
| 8 | Set, |
||
| 9 | FrozenSet, |
||
| 10 | Any, |
||
| 11 | Optional, |
||
| 12 | Type, |
||
| 13 | TypeVar, |
||
| 14 | Iterable, |
||
| 15 | ) |
||
| 16 | |||
| 17 | from pocketutils.core.dot_dict import NestedDotDict |
||
| 18 | from pocketutils.tools.base_tools import BaseTools |
||
| 19 | |||
| 20 | from mandos.model.pubchem_support._nav_model import FilterFn |
||
| 21 | |||
| 22 | |||
| 23 | T = TypeVar("T", covariant=True) |
||
| 24 | V = TypeVar("V", covariant=True) |
||
| 25 | |||
| 26 | catchable_errors = (KeyError, ValueError, LookupError, TypeError) |
||
| 27 | |||
| 28 | |||
| 29 | class NavError(Exception): |
||
| 30 | pass |
||
| 31 | |||
| 32 | |||
| 33 | class MapError(NavError): |
||
| 34 | pass |
||
| 35 | |||
| 36 | |||
| 37 | class FlatmapError(NavError): |
||
| 38 | pass |
||
| 39 | |||
| 40 | |||
| 41 | class FilterError(NavError): |
||
| 42 | pass |
||
| 43 | |||
| 44 | |||
| 45 | def _identity(x: T) -> T: |
||
| 46 | return x |
||
| 47 | |||
| 48 | |||
| 49 | # This happens to be duplicated in nav_utils, |
||
| 50 | # but I *really* don't want that dependency |
||
| 51 | def _request_only(things: Iterable[str]) -> Optional[str]: |
||
| 52 | # TODO: Did I mean to excludeNone here? |
||
| 53 | things = [s.strip() for s in things if s is not None] |
||
| 54 | if len(things) > 1: |
||
| 55 | raise ValueError(f"{len(things)} items in {things}") |
||
| 56 | elif len(things) == 0: |
||
| 57 | return None |
||
| 58 | else: |
||
| 59 | return things[0] |
||
| 60 | |||
| 61 | |||
| 62 | def _get_conversion_fn(fn: Union[None, str, Callable[[Any], Any]]) -> Callable[[Any], Any]: |
||
| 63 | if fn is None: |
||
| 64 | return _identity |
||
| 65 | if isinstance(fn, str): |
||
| 66 | return _request_only |
||
| 67 | else: |
||
| 68 | return fn |
||
| 69 | |||
| 70 | |||
| 71 | @dataclass(frozen=True, eq=True) |
||
| 72 | class AbstractJsonNavigator: |
||
| 73 | """""" |
||
| 74 | |||
| 75 | |||
| 76 | @dataclass(frozen=True, eq=True) |
||
| 77 | class JsonNavigator(AbstractJsonNavigator): |
||
| 78 | contents: List[NestedDotDict] |
||
| 79 | |||
| 80 | @classmethod |
||
| 81 | def create( |
||
| 82 | cls, dct: Union[dict, NestedDotDict, Sequence[dict], Sequence[NestedDotDict]] |
||
| 83 | ) -> JsonNavigator: |
||
| 84 | if hasattr(dct, "items"): |
||
| 85 | dct = [dct] |
||
| 86 | return JsonNavigator([NestedDotDict(dict(**d, _landmark="")) for d in dct]) |
||
| 87 | |||
| 88 | @property |
||
| 89 | def get(self) -> List[NestedDotDict]: |
||
| 90 | return self.contents |
||
| 91 | |||
| 92 | def __truediv__( |
||
| 93 | self, key: Union[int, str, FilterFn, Callable[[NestedDotDict], NestedDotDict]] |
||
| 94 | ) -> JsonNavigator: |
||
| 95 | View Code Duplication | if isinstance(key, FilterFn): |
|
| 96 | try: |
||
| 97 | return self._filter(key) |
||
| 98 | except catchable_errors as e: |
||
| 99 | raise FilterError(f"Failed to go filter navigator with '{key}': {e}") |
||
| 100 | else: |
||
| 101 | try: |
||
| 102 | return self._go_inside(key) |
||
| 103 | except catchable_errors as e: |
||
| 104 | raise MapError(f"Failed to map navigator with '{key}': {e}") |
||
| 105 | |||
| 106 | def __mod__(self, key: Union[int, str]) -> JsonNavigator: |
||
| 107 | new = {} |
||
| 108 | for z in self.contents: |
||
| 109 | if z[key] in new: |
||
| 110 | raise ValueError(f"{key} found twice") |
||
| 111 | new[z[key]] = z |
||
| 112 | return JsonNavigator([NestedDotDict(new)]) |
||
| 113 | |||
| 114 | def __floordiv__(self, keys: Sequence[str]) -> JsonNavigatorListOfLists: |
||
| 115 | try: |
||
| 116 | return JsonNavigatorListOfLists([[z.get(key) for key in keys] for z in self.contents]) |
||
| 117 | except catchable_errors as e: |
||
| 118 | raise FlatmapError(f"Failed to flatmap from navigator with '{keys}': {e}") |
||
| 119 | |||
| 120 | def __rshift__(self, key: str) -> JsonNavigatorListOfOptionals: |
||
| 121 | try: |
||
| 122 | return JsonNavigatorListOfOptionals([z.get(key) for z in self.contents]) |
||
| 123 | except catchable_errors as e: |
||
| 124 | raise FlatmapError(f"Failed to 'double-flatmap' from navigator with '{key}': {e}") |
||
| 125 | |||
| 126 | def _filter(self, keep_where: FilterFn) -> JsonNavigator: |
||
| 127 | if callable(keep_where): |
||
| 128 | return JsonNavigator([z for z in self.contents if keep_where(z)]) |
||
| 129 | else: |
||
| 130 | key, values = keep_where |
||
| 131 | if not isinstance(values, (Set, FrozenSet, List)): |
||
| 132 | values = {values} |
||
| 133 | return JsonNavigator([z for z in self.contents if z.get(key) in values]) |
||
| 134 | |||
| 135 | def _go_inside(self, key: Union[int, str]) -> JsonNavigator: |
||
| 136 | new = [] |
||
| 137 | for z in self.contents: |
||
| 138 | if key in z: |
||
| 139 | # nav = z.get_as("_nav", list, []) |
||
| 140 | # nav.append(z[key]) |
||
| 141 | if isinstance(z.get(key), list): |
||
| 142 | new.extend([NestedDotDict(dict(**m)) for m in z[key]]) |
||
| 143 | elif isinstance(z.get(key), NestedDotDict): |
||
| 144 | new.append(NestedDotDict(dict(**z[key]))) |
||
| 145 | elif isinstance(z.get(key), dict): |
||
| 146 | new.append(NestedDotDict(dict(**z[key]))) |
||
| 147 | else: |
||
| 148 | raise ValueError(f"{key} value is {type(z[key])}: {z[key]}") |
||
| 149 | return JsonNavigator(new) |
||
| 150 | |||
| 151 | |||
| 152 | @dataclass(frozen=True, eq=True) |
||
| 153 | class JsonNavigatorListOfLists(AbstractJsonNavigator): |
||
| 154 | contents: List[List[Any]] |
||
| 155 | |||
| 156 | def __truediv__( |
||
| 157 | self, |
||
| 158 | keys: Union[ |
||
| 159 | Sequence[Union[None, str, Callable[[Any], Any]]], FilterFn, Callable[[List[T]], Any] |
||
| 160 | ], |
||
| 161 | ) -> JsonNavigatorListOfLists: |
||
| 162 | View Code Duplication | if isinstance(keys, FilterFn): |
|
| 163 | try: |
||
| 164 | return self._filter(keys) |
||
| 165 | except catchable_errors as e: |
||
| 166 | raise FilterError(f"Failed to filter list-of-lists with '{keys}': {e}") |
||
| 167 | else: |
||
| 168 | try: |
||
| 169 | return self._go_inside(keys) |
||
| 170 | except catchable_errors as e: |
||
| 171 | raise MapError(f"Failed to map list-of-lists with '{keys}': {e}") |
||
| 172 | |||
| 173 | def __rshift__(self, conversion: Callable[[List[List[T]]], Any]) -> JsonNavigatorSingleOptional: |
||
| 174 | try: |
||
| 175 | return JsonNavigatorSingleOptional(conversion(self.contents)) |
||
| 176 | except catchable_errors as e: |
||
| 177 | raise FlatmapError( |
||
| 178 | f"Failed to 'double-flatmap' from list-of-lists with '{conversion}': {e}" |
||
| 179 | ) |
||
| 180 | |||
| 181 | def __floordiv__(self, conversion: Callable[[List[T]], Any]) -> JsonNavigatorListOfOptionals: |
||
| 182 | try: |
||
| 183 | return JsonNavigatorListOfOptionals([conversion(z) for z in self.contents]) |
||
| 184 | except catchable_errors as e: |
||
| 185 | raise FlatmapError(f"Failed to flatmap from list-of-lists with '{conversion}': {e}") |
||
| 186 | |||
| 187 | def _filter(self, keep_if: FilterFn) -> JsonNavigatorListOfLists: |
||
| 188 | return JsonNavigatorListOfLists([z for z in self.contents if keep_if(z)]) |
||
| 189 | |||
| 190 | def _go_inside( |
||
| 191 | self, keys: Sequence[Union[None, str, Callable[[Any], Any]]] |
||
| 192 | ) -> JsonNavigatorListOfLists: |
||
| 193 | fns = [_get_conversion_fn(fn) for fn in keys] |
||
| 194 | return JsonNavigatorListOfLists( |
||
| 195 | [ |
||
| 196 | [fn(value) for value, fn in BaseTools.zip_list(contents, fns)] |
||
| 197 | for contents in self.contents |
||
| 198 | ] |
||
| 199 | ) |
||
| 200 | |||
| 201 | |||
| 202 | @dataclass(frozen=True, eq=True) |
||
| 203 | class JsonNavigatorListOfOptionals(AbstractJsonNavigator): |
||
| 204 | contents: List[Optional[T]] |
||
| 205 | |||
| 206 | @property |
||
| 207 | def to_list(self) -> List[T]: |
||
| 208 | return [k for k in self.contents if k is not None] |
||
| 209 | |||
| 210 | @property |
||
| 211 | def to_set(self) -> FrozenSet[T]: |
||
| 212 | return frozenset([k for k in self.contents if k is not None]) |
||
| 213 | |||
| 214 | def __truediv__( |
||
| 215 | self, key: Union[None, str, FilterFn, Callable[[Any], Any]] |
||
| 216 | ) -> JsonNavigatorListOfOptionals: |
||
| 217 | if isinstance(key, FilterFn): |
||
| 218 | try: |
||
| 219 | return self._filter(key) |
||
| 220 | except catchable_errors as e: |
||
| 221 | raise FilterError(f"Failed to filter list-of-optionals with '{key}': {e}") |
||
| 222 | else: |
||
| 223 | try: |
||
| 224 | return self._go_inside(key) |
||
| 225 | except (KeyError, ValueError, LookupError) as e: |
||
| 226 | raise FlatmapError(f"Failed to map list-of-optionals with '{key}': {e}") |
||
| 227 | |||
| 228 | def __rshift__(self, key: Callable[[List[Optional[Any]]], Any]) -> JsonNavigatorSingleOptional: |
||
| 229 | # we can't skip 2 |
||
| 230 | return self // key |
||
| 231 | |||
| 232 | def __floordiv__( |
||
| 233 | self, key: Callable[[List[Optional[Any]]], Any] |
||
| 234 | ) -> JsonNavigatorSingleOptional: |
||
| 235 | try: |
||
| 236 | return JsonNavigatorSingleOptional(key(self.contents)) |
||
| 237 | except catchable_errors as e: |
||
| 238 | raise FlatmapError(f"Failed to flatmap from list-of-optionals with '{key}': {e}") |
||
| 239 | |||
| 240 | def _go_inside( |
||
| 241 | self, key: Union[None, str, Callable[[Any], Any]] |
||
| 242 | ) -> JsonNavigatorListOfOptionals: |
||
| 243 | fn = _get_conversion_fn(key) |
||
| 244 | return JsonNavigatorListOfOptionals( |
||
| 245 | [None if content is None else fn(content) for content in self.contents] |
||
| 246 | ) |
||
| 247 | |||
| 248 | def _filter( |
||
| 249 | self, keep_if: Union[Callable[[Optional[T]], bool]] |
||
| 250 | ) -> JsonNavigatorListOfOptionals: |
||
| 251 | return JsonNavigatorListOfOptionals([z for z in self.contents if keep_if(z)]) |
||
| 252 | |||
| 253 | |||
| 254 | @dataclass(frozen=True, eq=True) |
||
| 255 | class JsonNavigatorSingleOptional(AbstractJsonNavigator): |
||
| 256 | contents: Optional[T] |
||
| 257 | |||
| 258 | @property |
||
| 259 | def get(self) -> T: |
||
| 260 | return self.contents |
||
| 261 | |||
| 262 | def __floordiv__( |
||
| 263 | self, conversion: Union[Type[T], Callable[[T], V]] |
||
| 264 | ) -> JsonNavigatorSingleOptional: |
||
| 265 | try: |
||
| 266 | return JsonNavigatorSingleOptional(conversion(self.contents)) |
||
| 267 | except catchable_errors as e: |
||
| 268 | raise FlatmapError(f"Failed to map single-optional with '{conversion}': {e}") |
||
| 269 | |||
| 270 | |||
| 271 | __all__ = [ |
||
| 272 | "JsonNavigator", |
||
| 273 | "JsonNavigatorListOfLists", |
||
| 274 | "JsonNavigatorListOfOptionals", |
||
| 275 | "AbstractJsonNavigator", |
||
| 276 | "JsonNavigatorSingleOptional", |
||
| 277 | "NavError", |
||
| 278 | "MapError", |
||
| 279 | "FlatmapError", |
||
| 280 | "FilterError", |
||
| 281 | ] |
||
| 282 |