Total Complexity | 55 |
Total Lines | 577 |
Duplicated Lines | 77.3 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like barentsz._discover often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | import glob |
||
2 | import re |
||
3 | import sys |
||
4 | from importlib import import_module |
||
5 | from inspect import ( |
||
6 | getmembers, |
||
7 | isclass, |
||
8 | isfunction, |
||
9 | ismethod, |
||
10 | ) |
||
11 | from pathlib import Path |
||
12 | from typing import ( |
||
13 | Any, |
||
14 | Callable, |
||
15 | Dict, |
||
16 | Iterable, |
||
17 | List, |
||
18 | Optional, |
||
19 | Set, |
||
20 | Tuple, |
||
21 | Type, |
||
22 | TypeVar, |
||
23 | Union, |
||
24 | ) |
||
25 | |||
26 | from typish import ( |
||
27 | Module, |
||
28 | instance_of, |
||
29 | subclass_of, |
||
30 | ) |
||
31 | |||
32 | from barentsz._attribute import Attribute |
||
33 | from barentsz._here import here |
||
34 | from barentsz._typings import ClsPredicate |
||
35 | |||
36 | |||
37 | View Code Duplication | def discover( |
|
|
|||
38 | source: Any = None, |
||
39 | *, |
||
40 | what: Any = List[type], |
||
41 | **kwargs: dict, |
||
42 | ) -> list: |
||
43 | """ |
||
44 | Convenience function for discovering types in some source. If not source |
||
45 | is given, the directory is used in which the calling module is located. |
||
46 | |||
47 | Args: |
||
48 | source: the source in which is searched or the directory of the |
||
49 | caller if None. |
||
50 | what: the type that is to be discovered. |
||
51 | **kwargs: any keyword argument that is passed on. |
||
52 | |||
53 | Returns: a list of discoveries. |
||
54 | |||
55 | """ |
||
56 | source = source or here(1) |
||
57 | |||
58 | delegates = [ |
||
59 | (List[type], _discover_list), |
||
60 | (list, _discover_list), |
||
61 | (List, _discover_list), |
||
62 | ] |
||
63 | |||
64 | for tuple_ in delegates: |
||
65 | type_, delegate = tuple_ |
||
66 | if subclass_of(what, type_): |
||
67 | return delegate(what, source, **kwargs) |
||
68 | |||
69 | accepted_types = ', '.join(['`{}`'.format(delegate) |
||
70 | for delegate, _ in delegates]) |
||
71 | raise ValueError('Type `{}` is not supported. This function accepts: ' |
||
72 | '{}'.format(what, accepted_types)) |
||
73 | |||
74 | |||
75 | View Code Duplication | def discover_paths(directory: Union[Path, str], pattern: str) -> List[Path]: |
|
76 | """ |
||
77 | Return a list of Paths within the given directory that match the given |
||
78 | pattern. |
||
79 | |||
80 | Args: |
||
81 | directory: the directory in which is searched for paths. |
||
82 | pattern: a pattern (example: '**/*.py'). |
||
83 | |||
84 | Returns: a list of Path objects. |
||
85 | |||
86 | """ |
||
87 | directory_path = _path(directory) |
||
88 | abspath = str(directory_path.absolute()) |
||
89 | sys.path.insert(0, abspath) |
||
90 | path_to_discover = directory_path.joinpath(pattern) |
||
91 | result = [Path(filename) for filename in |
||
92 | glob.iglob(str(path_to_discover), recursive=True)] |
||
93 | result.sort() |
||
94 | return result |
||
95 | |||
96 | |||
97 | def discover_packages(directory: Union[Path, str]) -> List[str]: |
||
98 | """ |
||
99 | Return a list of packages within the given directory. The directory must be |
||
100 | a package. |
||
101 | Args: |
||
102 | directory: the directory in which is searched for packages. |
||
103 | |||
104 | Returns: a list of packages. |
||
105 | |||
106 | """ |
||
107 | result = list(_discover_packages_per_path(directory).values()) |
||
108 | result.sort() |
||
109 | return result |
||
110 | |||
111 | |||
112 | View Code Duplication | def discover_module_names( |
|
113 | directory: Union[Path, str], |
||
114 | include_privates: bool = False) -> List[str]: |
||
115 | """ |
||
116 | Return a list of module names within the given directory. The directory |
||
117 | must be a package and only names are returned of modules that are in |
||
118 | packages. |
||
119 | Args: |
||
120 | directory: the directory in which is searched for modules. |
||
121 | include_privates: if True, privates (unders and dunders) are also |
||
122 | included. |
||
123 | |||
124 | Returns: a list of module names (strings). |
||
125 | |||
126 | """ |
||
127 | result = [] |
||
128 | packages_per_path = _discover_packages_per_path(directory) |
||
129 | for path, package_name in packages_per_path.items(): |
||
130 | result.extend(['{}.{}'.format(package_name, p.stem) |
||
131 | for p in discover_paths(path, '*.py') |
||
132 | if p.stem != '__init__' |
||
133 | and (include_privates or not p.stem.startswith('_'))]) |
||
134 | result.sort() |
||
135 | return result |
||
136 | |||
137 | |||
138 | View Code Duplication | def discover_modules( |
|
139 | directory: Union[Path, str], |
||
140 | include_privates: bool = False, |
||
141 | raise_on_fail: bool = False) -> List[Module]: |
||
142 | """ |
||
143 | Return a list of modules within the given directory. The directory must be |
||
144 | a package and only modules are returned that are in packages. |
||
145 | Args: |
||
146 | directory: the directory in which is searched for modules. |
||
147 | include_privates: if True, privates (unders and dunders) are also |
||
148 | included. |
||
149 | raise_on_fail: if True, an ImportError is raised upon failing to |
||
150 | import any module. |
||
151 | |||
152 | Returns: a list of module objects. |
||
153 | |||
154 | """ |
||
155 | modules = discover_module_names(directory, include_privates) |
||
156 | result = [] |
||
157 | for module in modules: |
||
158 | try: |
||
159 | imported_module = import_module(module) |
||
160 | result.append(imported_module) |
||
161 | except Exception as err: |
||
162 | if raise_on_fail: |
||
163 | raise ImportError(err) from err |
||
164 | result.sort(key=lambda module: module.__name__) |
||
165 | return result |
||
166 | |||
167 | |||
168 | View Code Duplication | def discover_classes( |
|
169 | source: Union[Path, str, Module, Iterable[Module]], |
||
170 | signature: type = Any, # type: ignore |
||
171 | include_privates: bool = False, |
||
172 | in_private_modules: bool = False, |
||
173 | raise_on_fail: bool = False, |
||
174 | exclude: Union[type, ClsPredicate, |
||
175 | Iterable[Union[type, ClsPredicate]]] = None |
||
176 | ) -> List[type]: |
||
177 | """ |
||
178 | Discover any classes within the given source and according to the given |
||
179 | constraints. |
||
180 | |||
181 | Args: |
||
182 | source: the source in which is searched for any classes. |
||
183 | signature: only classes that inherit from signature are returned. |
||
184 | include_privates: if True, private classes are included as well. |
||
185 | in_private_modules: if True, private modules are explored as well. |
||
186 | raise_on_fail: if True, raises an ImportError upon the first import |
||
187 | failure. |
||
188 | exclude: one or more types or predicates that are to be excluded |
||
189 | from the result. |
||
190 | |||
191 | Returns: a list of all discovered classes (types). |
||
192 | |||
193 | """ |
||
194 | exclude_ = _ensure_set(exclude) |
||
195 | elements = _discover_elements(source, isclass, include_privates, |
||
196 | in_private_modules, raise_on_fail) |
||
197 | result = list({cls for cls in elements |
||
198 | if (signature is Any or subclass_of(cls, signature)) |
||
199 | and cls not in exclude_}) |
||
200 | |||
201 | exclude_predicates = (e for e in exclude_ if isfunction(e)) |
||
202 | for pred in exclude_predicates: |
||
203 | result = [cls for cls in result if not pred(cls)] # type: ignore[operator] # noqa |
||
204 | result.sort(key=lambda cls: cls.__name__) |
||
205 | return result |
||
206 | |||
207 | |||
208 | View Code Duplication | def discover_functions( |
|
209 | source: Union[Path, str, Module, Iterable[Module], type], |
||
210 | signature: Type[Callable] = Callable, # type: ignore |
||
211 | include_privates: bool = False, |
||
212 | in_private_modules: bool = False, |
||
213 | raise_on_fail: bool = False) -> List[type]: |
||
214 | """ |
||
215 | Discover any functions within the given source and according to the given |
||
216 | constraints. |
||
217 | |||
218 | Args: |
||
219 | source: the source in which is searched for any functions. |
||
220 | signature: only functions that have this signature (parameters and |
||
221 | return type) are included. |
||
222 | include_privates: if True, private functions are included as well. |
||
223 | in_private_modules: if True, private modules are explored as well. |
||
224 | raise_on_fail: if True, raises an ImportError upon the first import |
||
225 | failure. |
||
226 | |||
227 | Returns: a list of all discovered functions. |
||
228 | |||
229 | """ |
||
230 | |||
231 | def filter_(*args_: Iterable[Any]) -> bool: |
||
232 | return (isfunction(*args_) |
||
233 | or ismethod(*args_)) |
||
234 | |||
235 | if not isinstance(source, type): |
||
236 | filter_ = isfunction # type: ignore |
||
237 | |||
238 | elements = _discover_elements(source, filter_, include_privates, |
||
239 | in_private_modules, raise_on_fail) |
||
240 | result = [elem for elem in elements |
||
241 | if (signature is Callable or instance_of(elem, signature))] |
||
242 | result.sort(key=lambda func: func.__name__) |
||
243 | return result |
||
244 | |||
245 | |||
246 | View Code Duplication | def discover_attributes( |
|
247 | source: Union[Path, str, Module, Iterable[Module]], |
||
248 | signature: type = Any, # type: ignore |
||
249 | include_privates: bool = False, |
||
250 | in_private_modules: bool = False, |
||
251 | raise_on_fail: bool = False) -> List[Attribute]: |
||
252 | """ |
||
253 | Discover any attributes within the given source and according to the given |
||
254 | constraints. |
||
255 | |||
256 | Args: |
||
257 | source: the source in which is searched for any attributes. |
||
258 | signature: only attributes that are subtypes of this signature are |
||
259 | included. |
||
260 | include_privates: if True, private attributes are included as well. |
||
261 | in_private_modules: if True, private modules are explored as well. |
||
262 | raise_on_fail: if True, raises an ImportError upon the first import |
||
263 | failure. |
||
264 | |||
265 | Returns: a list of all discovered attributes. |
||
266 | |||
267 | """ |
||
268 | modules = _get_modules_from_source(source, in_private_modules, |
||
269 | raise_on_fail) |
||
270 | attributes: List[Attribute] = [] |
||
271 | for module in modules: |
||
272 | with open(module.__file__) as module_file: |
||
273 | lines = list(module_file) |
||
274 | attributes += _discover_attributes_in_lines( |
||
275 | lines, module, signature, include_privates) |
||
276 | attributes.sort(key=lambda attr: attr.name) |
||
277 | return attributes |
||
278 | |||
279 | |||
280 | View Code Duplication | def _discover_attributes_in_lines( |
|
281 | lines: List[str], |
||
282 | module: Module, |
||
283 | signature: type, |
||
284 | include_privates: bool) -> List[Attribute]: |
||
285 | """ |
||
286 | Discover any attributes within the lines of codee and according to the |
||
287 | given constraints. |
||
288 | |||
289 | Args: |
||
290 | lines: the lines of code in which is searched for any attributes. |
||
291 | module: the module from which these lines originate. |
||
292 | signature: only attributes that are subtypes of this signature are |
||
293 | included. |
||
294 | include_privates: if True, private attributes are included as well. |
||
295 | |||
296 | Returns: a list of all discovered attributes. |
||
297 | |||
298 | """ |
||
299 | attributes = [] |
||
300 | for index, line in enumerate(lines): |
||
301 | match = _match_attribute(line) |
||
302 | if match: |
||
303 | name, hint, value, comment = match |
||
304 | docstring = _find_attribute_docstring(lines[0:index]) |
||
305 | attribute = _create_attribute(name, hint, value, docstring, |
||
306 | comment, module, line, index + 1) |
||
307 | if (instance_of(attribute.value, signature) |
||
308 | and (attribute.is_public or include_privates)): |
||
309 | attributes.append(attribute) |
||
310 | return attributes |
||
311 | |||
312 | |||
313 | View Code Duplication | def _discover_elements( |
|
314 | source: Union[Path, str, Module, Iterable[Module], type], |
||
315 | filter_: Callable[[Any], bool], |
||
316 | include_privates: bool = False, |
||
317 | in_private_modules: bool = False, |
||
318 | raise_on_fail: bool = False) -> List[Any]: |
||
319 | """ |
||
320 | Discover elements (such as attributes or functions) in the given source. |
||
321 | Args: |
||
322 | source: the source that is explored. |
||
323 | filter_: the filter that determines the type of element. |
||
324 | include_privates: if True, private elements are returned as well. |
||
325 | in_private_modules: if True, private modules are examined as well. |
||
326 | raise_on_fail: if True, an ImportError will be raised upon import |
||
327 | failure. |
||
328 | |||
329 | Returns: a list of elements. |
||
330 | |||
331 | """ |
||
332 | if isinstance(source, type): |
||
333 | sources = [source] # type: Iterable |
||
334 | else: |
||
335 | sources = _get_modules_from_source(source, in_private_modules, |
||
336 | raise_on_fail) |
||
337 | |||
338 | elements = [elem for src in sources |
||
339 | for _, elem in getmembers(src, filter_) |
||
340 | if (in_private_modules or not src.__name__.startswith('_')) |
||
341 | and (include_privates or not elem.__name__.startswith('_'))] |
||
342 | return elements |
||
343 | |||
344 | |||
345 | View Code Duplication | def _discover_packages_per_path( |
|
346 | directory: Union[Path, str]) -> Dict[Path, str]: |
||
347 | """ |
||
348 | Discover packages and their original Paths within the given directory. |
||
349 | Args: |
||
350 | directory: the directory in which is searched for modules. |
||
351 | |||
352 | Returns: a dict with Paths as keys and strings (the package names) as |
||
353 | values. |
||
354 | |||
355 | """ |
||
356 | directory_path = _path(directory) |
||
357 | if not directory_path.exists(): |
||
358 | raise ValueError('The given directory does not exist. ' |
||
359 | 'Given: {}'.format(directory)) |
||
360 | if not _is_package(directory_path): |
||
361 | raise ValueError('The given directory must itself be a package. ' |
||
362 | 'Given: {}'.format(directory)) |
||
363 | |||
364 | paths_to_inits = discover_paths(directory_path, '**/__init__.py') |
||
365 | paths = [p.parent for p in paths_to_inits] |
||
366 | packages_per_path = {p: _to_package_name(p) for p in paths} |
||
367 | |||
368 | # All packages must have a straight line of packages from the base package. |
||
369 | base_package = _to_package_name(directory_path) |
||
370 | result = {path: package for path, package in packages_per_path.items() |
||
371 | if package.startswith(base_package)} |
||
372 | |||
373 | return result |
||
374 | |||
375 | |||
376 | View Code Duplication | def _path(directory: Union[Path, str]) -> Path: |
|
377 | """ |
||
378 | Return a path if directory is a string or return directory if it is a Path |
||
379 | already. Raise a ValueError if it is neither a Path nor a string. |
||
380 | |||
381 | Args: |
||
382 | directory: the directory that is a string or Path. |
||
383 | |||
384 | Returns: a Path instance. |
||
385 | |||
386 | """ |
||
387 | if isinstance(directory, Path): |
||
388 | result = directory |
||
389 | elif isinstance(directory, str): |
||
390 | result = Path(directory) |
||
391 | else: |
||
392 | raise ValueError('Invalid type ({}) for directory, provide a Path or ' |
||
393 | 'a string.'.format(type(directory))) |
||
394 | return result |
||
395 | |||
396 | |||
397 | View Code Duplication | def _get_modules_from_source( |
|
398 | source: Union[Path, str, Module, Iterable[Module]], |
||
399 | in_private_modules: bool = False, |
||
400 | raise_on_fail: bool = False |
||
401 | ) -> Iterable[Module]: |
||
402 | """ |
||
403 | Get an iterable of Modules from the given source. |
||
404 | Args: |
||
405 | source: anything that can be turned into an iterable of Modules. |
||
406 | in_private_modules: if True, private modules are explored as well. |
||
407 | raise_on_fail: if True, raises an ImportError upon the first import |
||
408 | failure. |
||
409 | |||
410 | Returns: an iterable of Module instances. |
||
411 | |||
412 | """ |
||
413 | if isinstance(source, Path): |
||
414 | modules = discover_modules(source, in_private_modules, raise_on_fail) |
||
415 | elif isinstance(source, str): |
||
416 | modules = discover_modules(Path(source), in_private_modules, |
||
417 | raise_on_fail) |
||
418 | elif isinstance(source, Module): |
||
419 | modules = [source] |
||
420 | elif instance_of(source, Iterable[Module]): |
||
421 | modules = source # type: ignore |
||
422 | else: |
||
423 | raise ValueError('The given source must be a Path, string or module. ' |
||
424 | 'Given: {}'.format(source)) |
||
425 | return modules |
||
426 | |||
427 | |||
428 | View Code Duplication | def _match_attribute(line: str) -> Optional[Tuple[str, str, str, str]]: |
|
429 | """ |
||
430 | Try to match the given line with an attribute and return the name, |
||
431 | type hint, value and inline comment (respectively) if a match was |
||
432 | found. |
||
433 | |||
434 | Args: |
||
435 | line: the line of code that (may) contain an attribute declaration. |
||
436 | |||
437 | Returns: a tuple with strings (name, hint, value, comment) or None. |
||
438 | |||
439 | """ |
||
440 | attr_pattern = re.compile( |
||
441 | r'^' |
||
442 | r'\s*' |
||
443 | r'([a-zA-Z_]+[a-zA-Z_0-9]*)' # 1: Name. |
||
444 | r'(\s*:\s*(\w+)\s*)?' # 3: Type hint. |
||
445 | r'\s*=\s*' |
||
446 | r'(.+?)' # 4: Value. |
||
447 | r'\s*' |
||
448 | r'(#\s*(.*?)\s*)?' # 6: Inline comment. |
||
449 | r'$' |
||
450 | ) |
||
451 | match = attr_pattern.match(line) |
||
452 | result = None |
||
453 | if match: |
||
454 | attr_name = match.group(1) |
||
455 | hint = match.group(3) |
||
456 | attr_value = match.group(4) |
||
457 | inline_comments = match.group(6) |
||
458 | result = attr_name, hint, attr_value, inline_comments |
||
459 | return result |
||
460 | |||
461 | |||
462 | View Code Duplication | def _create_attribute( |
|
463 | name: str, |
||
464 | hint: Optional[str], |
||
465 | assigned_value: str, |
||
466 | docstring: Optional[str], |
||
467 | comment: Optional[str], |
||
468 | module: Module, |
||
469 | line: str, |
||
470 | line_nr: int) -> Attribute: |
||
471 | """ |
||
472 | Create and return an Attribute instance from the given parameters. |
||
473 | Args: |
||
474 | name: the name of the attribute. |
||
475 | hint: the type hint of the attribute (if any). |
||
476 | assigned_value: the string that was literally assigned. |
||
477 | docstring: the docstring above this attribute. |
||
478 | comment: an inline comment (if any). |
||
479 | module: the module that contains the attribute. |
||
480 | line: the line that defines the attribute. |
||
481 | line_nr: the line number of the attribute. |
||
482 | |||
483 | Returns: an Attribute instance. |
||
484 | |||
485 | """ |
||
486 | value = getattr(module, name) |
||
487 | type_ = type(value) |
||
488 | return Attribute( |
||
489 | name=name, |
||
490 | type_=type_, |
||
491 | value=value, |
||
492 | doc=docstring, |
||
493 | comment=comment, |
||
494 | hint=hint, |
||
495 | module=module, |
||
496 | assigned_value=assigned_value, |
||
497 | line=line, |
||
498 | line_nr=line_nr |
||
499 | ) |
||
500 | |||
501 | |||
502 | def _is_package(directory: Path) -> bool: |
||
503 | """ |
||
504 | Return True if the given directory is a package and False otherwise. |
||
505 | Args: |
||
506 | directory: the directory to check. |
||
507 | |||
508 | Returns: True if directory is a package. |
||
509 | |||
510 | """ |
||
511 | paths = discover_paths(directory, '__init__.py') |
||
512 | return len(paths) > 0 |
||
513 | |||
514 | |||
515 | def _to_package_name(directory: Path) -> str: |
||
516 | """ |
||
517 | Translate the given directory to a package (str). Check every parent |
||
518 | directory in the tree to find the complete fully qualified package name. |
||
519 | Args: |
||
520 | directory: the directory that is to become a package name. |
||
521 | |||
522 | Returns: a package name as string. |
||
523 | |||
524 | """ |
||
525 | parts: List[str] = [] |
||
526 | current_dir = directory.absolute() |
||
527 | while _is_package(current_dir): |
||
528 | # See how far up the tree we can go while still in a package. |
||
529 | parts.insert(0, current_dir.stem) |
||
530 | current_dir = current_dir.parent |
||
531 | return '.'.join(parts) |
||
532 | |||
533 | |||
534 | View Code Duplication | def _find_attribute_docstring(lines: List[str]) -> Optional[str]: |
|
535 | """ |
||
536 | Find any docstring that is right above an attribute. |
||
537 | Args: |
||
538 | lines: the lines of code that may contain a docstring. |
||
539 | |||
540 | Returns: a docstring (str) or None. |
||
541 | |||
542 | """ |
||
543 | result = None |
||
544 | if lines: |
||
545 | joined_lines = ''.join(lines).strip() |
||
546 | docstring_pattern = re.compile( |
||
547 | r'("{3}\s*([\s\S]+)\s*"{3}|' # 2: docstring content. |
||
548 | r'\'{3}\s*([\s\S]+)\s*\'{3})' # 3: docstring content. |
||
549 | r'$' |
||
550 | ) |
||
551 | match = docstring_pattern.match(joined_lines) |
||
552 | if match: |
||
553 | result = (match.group(2) or match.group(3)).strip() |
||
554 | return result |
||
555 | |||
556 | |||
557 | def _ensure_set(arg: Union[object, Iterable[object]]) -> Set[object]: |
||
558 | # Make sure that arg is a set. |
||
559 | result = arg or set() |
||
560 | if not isinstance(result, Iterable): |
||
561 | result = {result} |
||
562 | else: |
||
563 | result = set(result) |
||
564 | return result |
||
565 | |||
566 | |||
567 | View Code Duplication | def _discover_list( |
|
568 | what_: List[type], |
||
569 | source: Union[Path, str, Module, Iterable[Module]], |
||
570 | **kwargs: dict) -> List[type]: |
||
571 | args = getattr(what_, '__args__', None) or [Any] |
||
572 | signature = args[0] |
||
573 | if signature in (type, Type) or isinstance(signature, TypeVar): # type: ignore[arg-type] # noqa |
||
574 | signature = Any |
||
575 | kwargs['signature'] = signature |
||
576 | return discover_classes(source, **kwargs) # type: ignore[arg-type] |
||
577 |