Total Complexity | 54 |
Total Lines | 562 |
Duplicated Lines | 78.47 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like barentsz._discover often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | import glob |
||
2 | import inspect |
||
3 | import re |
||
4 | import sys |
||
5 | from importlib import import_module |
||
6 | from pathlib import Path |
||
7 | from typing import ( |
||
8 | Union, |
||
9 | Dict, |
||
10 | List, |
||
11 | Any, |
||
12 | Callable, |
||
13 | Type, |
||
14 | Iterable, |
||
15 | Optional, |
||
16 | Tuple, |
||
17 | Set, |
||
18 | TypeVar, |
||
19 | ) |
||
20 | |||
21 | from typish import Module, subclass_of, instance_of |
||
22 | |||
23 | from barentsz._here import here |
||
24 | from barentsz._attribute import Attribute |
||
25 | |||
26 | |||
27 | View Code Duplication | def discover( |
|
|
|||
28 | source: Any = None, |
||
29 | *, |
||
30 | what: Any = List[type], |
||
31 | **kwargs: dict, |
||
32 | ) -> list: |
||
33 | """ |
||
34 | Convenience function for discovering types in some source. If not source |
||
35 | is given, the directory is used in which the calling module is located. |
||
36 | |||
37 | Args: |
||
38 | source: the source in which is searched or the directory of the |
||
39 | caller if None. |
||
40 | what: the type that is to be discovered. |
||
41 | **kwargs: any keyword argument that is passed on. |
||
42 | |||
43 | Returns: a list of discoveries. |
||
44 | |||
45 | """ |
||
46 | source = source or here(1) |
||
47 | |||
48 | delegates = [ |
||
49 | (List[type], _discover_list), |
||
50 | (list, _discover_list), |
||
51 | (List, _discover_list), |
||
52 | ] |
||
53 | |||
54 | for tuple_ in delegates: |
||
55 | type_, delegate = tuple_ |
||
56 | if subclass_of(what, type_): |
||
57 | return delegate(what, source, **kwargs) |
||
58 | |||
59 | accepted_types = ', '.join(['`{}`'.format(delegate) |
||
60 | for delegate, _ in delegates]) |
||
61 | raise ValueError('Type `{}` is not supported. This function accepts: ' |
||
62 | '{}'.format(what, accepted_types)) |
||
63 | |||
64 | |||
65 | View Code Duplication | def discover_paths(directory: Union[Path, str], pattern: str) -> List[Path]: |
|
66 | """ |
||
67 | Return a list of Paths within the given directory that match the given |
||
68 | pattern. |
||
69 | |||
70 | Args: |
||
71 | directory: the directory in which is searched for paths. |
||
72 | pattern: a pattern (example: '**/*.py'). |
||
73 | |||
74 | Returns: a list of Path objects. |
||
75 | |||
76 | """ |
||
77 | directory_path = _path(directory) |
||
78 | abspath = str(directory_path.absolute()) |
||
79 | sys.path.insert(0, abspath) |
||
80 | path_to_discover = directory_path.joinpath(pattern) |
||
81 | result = [Path(filename) for filename in |
||
82 | glob.iglob(str(path_to_discover), recursive=True)] |
||
83 | result.sort() |
||
84 | return result |
||
85 | |||
86 | |||
87 | def discover_packages(directory: Union[Path, str]) -> List[str]: |
||
88 | """ |
||
89 | Return a list of packages within the given directory. The directory must be |
||
90 | a package. |
||
91 | Args: |
||
92 | directory: the directory in which is searched for packages. |
||
93 | |||
94 | Returns: a list of packages. |
||
95 | |||
96 | """ |
||
97 | result = list(_discover_packages_per_path(directory).values()) |
||
98 | result.sort() |
||
99 | return result |
||
100 | |||
101 | |||
102 | View Code Duplication | def discover_module_names( |
|
103 | directory: Union[Path, str], |
||
104 | include_privates: bool = False) -> List[str]: |
||
105 | """ |
||
106 | Return a list of module names within the given directory. The directory |
||
107 | must be a package and only names are returned of modules that are in |
||
108 | packages. |
||
109 | Args: |
||
110 | directory: the directory in which is searched for modules. |
||
111 | include_privates: if True, privates (unders and dunders) are also |
||
112 | included. |
||
113 | |||
114 | Returns: a list of module names (strings). |
||
115 | |||
116 | """ |
||
117 | result = [] |
||
118 | packages_per_path = _discover_packages_per_path(directory) |
||
119 | for path, package_name in packages_per_path.items(): |
||
120 | result.extend(['{}.{}'.format(package_name, p.stem) |
||
121 | for p in discover_paths(path, '*.py') |
||
122 | if p.stem != '__init__' |
||
123 | and (include_privates or not p.stem.startswith('_'))]) |
||
124 | result.sort() |
||
125 | return result |
||
126 | |||
127 | |||
128 | View Code Duplication | def discover_modules( |
|
129 | directory: Union[Path, str], |
||
130 | include_privates: bool = False, |
||
131 | raise_on_fail: bool = False) -> List[Module]: |
||
132 | """ |
||
133 | Return a list of modules within the given directory. The directory must be |
||
134 | a package and only modules are returned that are in packages. |
||
135 | Args: |
||
136 | directory: the directory in which is searched for modules. |
||
137 | include_privates: if True, privates (unders and dunders) are also |
||
138 | included. |
||
139 | raise_on_fail: if True, an ImportError is raised upon failing to |
||
140 | import any module. |
||
141 | |||
142 | Returns: a list of module objects. |
||
143 | |||
144 | """ |
||
145 | modules = discover_module_names(directory, include_privates) |
||
146 | result = [] |
||
147 | for module in modules: |
||
148 | try: |
||
149 | imported_module = import_module(module) |
||
150 | result.append(imported_module) |
||
151 | except Exception as err: |
||
152 | if raise_on_fail: |
||
153 | raise ImportError(err) |
||
154 | result.sort(key=lambda module: module.__name__) |
||
155 | return result |
||
156 | |||
157 | |||
158 | View Code Duplication | def discover_classes( |
|
159 | source: Union[Path, str, Module, Iterable[Module]], |
||
160 | signature: type = Any, # type: ignore |
||
161 | include_privates: bool = False, |
||
162 | in_private_modules: bool = False, |
||
163 | raise_on_fail: bool = False, |
||
164 | exclude: Union[Iterable[type], type] = None |
||
165 | ) -> List[type]: |
||
166 | """ |
||
167 | Discover any classes within the given source and according to the given |
||
168 | constraints. |
||
169 | |||
170 | Args: |
||
171 | source: the source in which is searched for any classes. |
||
172 | signature: only classes that inherit from signature are returned. |
||
173 | include_privates: if True, private classes are included as well. |
||
174 | in_private_modules: if True, private modules are explored as well. |
||
175 | raise_on_fail: if True, raises an ImportError upon the first import |
||
176 | failure. |
||
177 | exclude: a type or multiple types that are to be excluded from the |
||
178 | result. |
||
179 | |||
180 | Returns: a list of all discovered classes (types). |
||
181 | |||
182 | """ |
||
183 | exclude_ = _ensure_set(exclude) |
||
184 | elements = _discover_elements(source, inspect.isclass, include_privates, |
||
185 | in_private_modules, raise_on_fail) |
||
186 | result = list({cls for cls in elements |
||
187 | if (signature is Any or subclass_of(cls, signature)) |
||
188 | and cls not in exclude_}) |
||
189 | result.sort(key=lambda cls: cls.__name__) |
||
190 | return result |
||
191 | |||
192 | |||
193 | View Code Duplication | def discover_functions( |
|
194 | source: Union[Path, str, Module, Iterable[Module], type], |
||
195 | signature: Type[Callable] = Callable, # type: ignore |
||
196 | include_privates: bool = False, |
||
197 | in_private_modules: bool = False, |
||
198 | raise_on_fail: bool = False) -> List[type]: |
||
199 | """ |
||
200 | Discover any functions within the given source and according to the given |
||
201 | constraints. |
||
202 | |||
203 | Args: |
||
204 | source: the source in which is searched for any functions. |
||
205 | signature: only functions that have this signature (parameters and |
||
206 | return type) are included. |
||
207 | include_privates: if True, private functions are included as well. |
||
208 | in_private_modules: if True, private modules are explored as well. |
||
209 | raise_on_fail: if True, raises an ImportError upon the first import |
||
210 | failure. |
||
211 | |||
212 | Returns: a list of all discovered functions. |
||
213 | |||
214 | """ |
||
215 | |||
216 | def filter_(*args_: Iterable[Any]) -> bool: |
||
217 | return (inspect.isfunction(*args_) |
||
218 | or inspect.ismethod(*args_)) |
||
219 | |||
220 | if not isinstance(source, type): |
||
221 | filter_ = inspect.isfunction # type: ignore |
||
222 | |||
223 | elements = _discover_elements(source, filter_, include_privates, |
||
224 | in_private_modules, raise_on_fail) |
||
225 | result = [elem for elem in elements |
||
226 | if (signature is Callable or instance_of(elem, signature))] |
||
227 | result.sort(key=lambda func: func.__name__) |
||
228 | return result |
||
229 | |||
230 | |||
231 | View Code Duplication | def discover_attributes( |
|
232 | source: Union[Path, str, Module, Iterable[Module]], |
||
233 | signature: type = Any, # type: ignore |
||
234 | include_privates: bool = False, |
||
235 | in_private_modules: bool = False, |
||
236 | raise_on_fail: bool = False) -> List[Attribute]: |
||
237 | """ |
||
238 | Discover any attributes within the given source and according to the given |
||
239 | constraints. |
||
240 | |||
241 | Args: |
||
242 | source: the source in which is searched for any attributes. |
||
243 | signature: only attributes that are subtypes of this signature are |
||
244 | included. |
||
245 | include_privates: if True, private attributes are included as well. |
||
246 | in_private_modules: if True, private modules are explored as well. |
||
247 | raise_on_fail: if True, raises an ImportError upon the first import |
||
248 | failure. |
||
249 | |||
250 | Returns: a list of all discovered attributes. |
||
251 | |||
252 | """ |
||
253 | modules = _get_modules_from_source(source, in_private_modules, |
||
254 | raise_on_fail) |
||
255 | attributes: List[Attribute] = [] |
||
256 | for module in modules: |
||
257 | with open(module.__file__) as module_file: |
||
258 | lines = list(module_file) |
||
259 | attributes += _discover_attributes_in_lines( |
||
260 | lines, module, signature, include_privates) |
||
261 | attributes.sort(key=lambda attr: attr.name) |
||
262 | return attributes |
||
263 | |||
264 | |||
265 | View Code Duplication | def _discover_attributes_in_lines( |
|
266 | lines: List[str], |
||
267 | module: Module, |
||
268 | signature: type, |
||
269 | include_privates: bool) -> List[Attribute]: |
||
270 | """ |
||
271 | Discover any attributes within the lines of codee and according to the |
||
272 | given constraints. |
||
273 | |||
274 | Args: |
||
275 | lines: the lines of code in which is searched for any attributes. |
||
276 | module: the module from which these lines originate. |
||
277 | signature: only attributes that are subtypes of this signature are |
||
278 | included. |
||
279 | include_privates: if True, private attributes are included as well. |
||
280 | |||
281 | Returns: a list of all discovered attributes. |
||
282 | |||
283 | """ |
||
284 | attributes = [] |
||
285 | for index, line in enumerate(lines): |
||
286 | match = _match_attribute(line) |
||
287 | if match: |
||
288 | name, hint, value, comment = match |
||
289 | docstring = _find_attribute_docstring(lines[0:index]) |
||
290 | attribute = _create_attribute(name, hint, value, docstring, |
||
291 | comment, module, line, index + 1) |
||
292 | if (instance_of(attribute.value, signature) |
||
293 | and (attribute.is_public or include_privates)): |
||
294 | attributes.append(attribute) |
||
295 | return attributes |
||
296 | |||
297 | |||
298 | View Code Duplication | def _discover_elements( |
|
299 | source: Union[Path, str, Module, Iterable[Module], type], |
||
300 | filter_: Callable[[Any], bool], |
||
301 | include_privates: bool = False, |
||
302 | in_private_modules: bool = False, |
||
303 | raise_on_fail: bool = False) -> List[Any]: |
||
304 | """ |
||
305 | Discover elements (such as attributes or functions) in the given source. |
||
306 | Args: |
||
307 | source: the source that is explored. |
||
308 | filter_: the filter that determines the type of element. |
||
309 | include_privates: if True, private elements are returned as well. |
||
310 | in_private_modules: if True, private modules are examined as well. |
||
311 | raise_on_fail: if True, an ImportError will be raised upon import |
||
312 | failure. |
||
313 | |||
314 | Returns: a list of elements. |
||
315 | |||
316 | """ |
||
317 | if isinstance(source, type): |
||
318 | sources = [source] # type: Iterable |
||
319 | else: |
||
320 | sources = _get_modules_from_source(source, in_private_modules, |
||
321 | raise_on_fail) |
||
322 | |||
323 | elements = [elem for src in sources |
||
324 | for _, elem in inspect.getmembers(src, filter_) |
||
325 | if (in_private_modules or not src.__name__.startswith('_')) |
||
326 | and (include_privates or not elem.__name__.startswith('_'))] |
||
327 | return elements |
||
328 | |||
329 | |||
330 | View Code Duplication | def _discover_packages_per_path( |
|
331 | directory: Union[Path, str]) -> Dict[Path, str]: |
||
332 | """ |
||
333 | Discover packages and their original Paths within the given directory. |
||
334 | Args: |
||
335 | directory: the directory in which is searched for modules. |
||
336 | |||
337 | Returns: a dict with Paths as keys and strings (the package names) as |
||
338 | values. |
||
339 | |||
340 | """ |
||
341 | directory_path = _path(directory) |
||
342 | if not directory_path.exists(): |
||
343 | raise ValueError('The given directory does not exist. ' |
||
344 | 'Given: {}'.format(directory)) |
||
345 | if not _is_package(directory_path): |
||
346 | raise ValueError('The given directory must itself be a package. ' |
||
347 | 'Given: {}'.format(directory)) |
||
348 | |||
349 | paths_to_inits = discover_paths(directory_path, '**/__init__.py') |
||
350 | paths = [p.parent for p in paths_to_inits] |
||
351 | packages_per_path = {p: _to_package_name(p) for p in paths} |
||
352 | |||
353 | # All packages must have a straight line of packages from the base package. |
||
354 | base_package = _to_package_name(directory_path) |
||
355 | result = {path: package for path, package in packages_per_path.items() |
||
356 | if package.startswith(base_package)} |
||
357 | |||
358 | return result |
||
359 | |||
360 | |||
361 | View Code Duplication | def _path(directory: Union[Path, str]) -> Path: |
|
362 | """ |
||
363 | Return a path if directory is a string or return directory if it is a Path |
||
364 | already. Raise a ValueError if it is neither a Path nor a string. |
||
365 | |||
366 | Args: |
||
367 | directory: the directory that is a string or Path. |
||
368 | |||
369 | Returns: a Path instance. |
||
370 | |||
371 | """ |
||
372 | if isinstance(directory, Path): |
||
373 | result = directory |
||
374 | elif isinstance(directory, str): |
||
375 | result = Path(directory) |
||
376 | else: |
||
377 | raise ValueError('Invalid type ({}) for directory, provide a Path or ' |
||
378 | 'a string.'.format(type(directory))) |
||
379 | return result |
||
380 | |||
381 | |||
382 | View Code Duplication | def _get_modules_from_source( |
|
383 | source: Union[Path, str, Module, Iterable[Module]], |
||
384 | in_private_modules: bool = False, |
||
385 | raise_on_fail: bool = False |
||
386 | ) -> Iterable[Module]: |
||
387 | """ |
||
388 | Get an iterable of Modules from the given source. |
||
389 | Args: |
||
390 | source: anything that can be turned into an iterable of Modules. |
||
391 | in_private_modules: if True, private modules are explored as well. |
||
392 | raise_on_fail: if True, raises an ImportError upon the first import |
||
393 | failure. |
||
394 | |||
395 | Returns: an iterable of Module instances. |
||
396 | |||
397 | """ |
||
398 | if isinstance(source, Path): |
||
399 | modules = discover_modules(source, in_private_modules, raise_on_fail) |
||
400 | elif isinstance(source, str): |
||
401 | modules = discover_modules(Path(source), in_private_modules, |
||
402 | raise_on_fail) |
||
403 | elif isinstance(source, Module): |
||
404 | modules = [source] |
||
405 | elif instance_of(source, Iterable[Module]): |
||
406 | modules = source # type: ignore |
||
407 | else: |
||
408 | raise ValueError('The given source must be a Path, string or module. ' |
||
409 | 'Given: {}'.format(source)) |
||
410 | return modules |
||
411 | |||
412 | |||
413 | View Code Duplication | def _match_attribute(line: str) -> Optional[Tuple[str, str, str, str]]: |
|
414 | """ |
||
415 | Try to match the given line with an attribute and return the name, |
||
416 | type hint, value and inline comment (respectively) if a match was |
||
417 | found. |
||
418 | |||
419 | Args: |
||
420 | line: the line of code that (may) contain an attribute declaration. |
||
421 | |||
422 | Returns: a tuple with strings (name, hint, value, comment) or None. |
||
423 | |||
424 | """ |
||
425 | attr_pattern = re.compile( |
||
426 | r'^' |
||
427 | r'\s*' |
||
428 | r'([a-zA-Z_]+[a-zA-Z_0-9]*)' # 1: Name. |
||
429 | r'(\s*:\s*(\w+)\s*)?' # 3: Type hint. |
||
430 | r'\s*=\s*' |
||
431 | r'(.+?)' # 4: Value. |
||
432 | r'\s*' |
||
433 | r'(#\s*(.*?)\s*)?' # 6: Inline comment. |
||
434 | r'$' |
||
435 | ) |
||
436 | match = attr_pattern.match(line) |
||
437 | result = None |
||
438 | if match: |
||
439 | attr_name = match.group(1) |
||
440 | hint = match.group(3) |
||
441 | attr_value = match.group(4) |
||
442 | inline_comments = match.group(6) |
||
443 | result = attr_name, hint, attr_value, inline_comments |
||
444 | return result |
||
445 | |||
446 | |||
447 | View Code Duplication | def _create_attribute( |
|
448 | name: str, |
||
449 | hint: Optional[str], |
||
450 | assigned_value: str, |
||
451 | docstring: Optional[str], |
||
452 | comment: Optional[str], |
||
453 | module: Module, |
||
454 | line: str, |
||
455 | line_nr: int) -> Attribute: |
||
456 | """ |
||
457 | Create and return an Attribute instance from the given parameters. |
||
458 | Args: |
||
459 | name: the name of the attribute. |
||
460 | hint: the type hint of the attribute (if any). |
||
461 | assigned_value: the string that was literally assigned. |
||
462 | docstring: the docstring above this attribute. |
||
463 | comment: an inline comment (if any). |
||
464 | module: the module that contains the attribute. |
||
465 | line: the line that defines the attribute. |
||
466 | line_nr: the line number of the attribute. |
||
467 | |||
468 | Returns: an Attribute instance. |
||
469 | |||
470 | """ |
||
471 | value = getattr(module, name) |
||
472 | type_ = type(value) |
||
473 | return Attribute( |
||
474 | name=name, |
||
475 | type_=type_, |
||
476 | value=value, |
||
477 | doc=docstring, |
||
478 | comment=comment, |
||
479 | hint=hint, |
||
480 | module=module, |
||
481 | assigned_value=assigned_value, |
||
482 | line=line, |
||
483 | line_nr=line_nr |
||
484 | ) |
||
485 | |||
486 | |||
487 | def _is_package(directory: Path) -> bool: |
||
488 | """ |
||
489 | Return True if the given directory is a package and False otherwise. |
||
490 | Args: |
||
491 | directory: the directory to check. |
||
492 | |||
493 | Returns: True if directory is a package. |
||
494 | |||
495 | """ |
||
496 | paths = discover_paths(directory, '__init__.py') |
||
497 | return len(paths) > 0 |
||
498 | |||
499 | |||
500 | def _to_package_name(directory: Path) -> str: |
||
501 | """ |
||
502 | Translate the given directory to a package (str). Check every parent |
||
503 | directory in the tree to find the complete fully qualified package name. |
||
504 | Args: |
||
505 | directory: the directory that is to become a package name. |
||
506 | |||
507 | Returns: a package name as string. |
||
508 | |||
509 | """ |
||
510 | parts: List[str] = [] |
||
511 | current_dir = directory |
||
512 | while _is_package(current_dir): |
||
513 | # See how far up the tree we can go while still in a package. |
||
514 | parts.insert(0, current_dir.stem) |
||
515 | current_dir = current_dir.parent |
||
516 | return '.'.join(parts) |
||
517 | |||
518 | |||
519 | View Code Duplication | def _find_attribute_docstring(lines: List[str]) -> Optional[str]: |
|
520 | """ |
||
521 | Find any docstring that is right above an attribute. |
||
522 | Args: |
||
523 | lines: the lines of code that may contain a docstring. |
||
524 | |||
525 | Returns: a docstring (str) or None. |
||
526 | |||
527 | """ |
||
528 | result = None |
||
529 | if lines: |
||
530 | joined_lines = ''.join(lines).strip() |
||
531 | docstring_pattern = re.compile( |
||
532 | r'("{3}\s*([\s\S]+)\s*"{3}|' # 2: docstring content. |
||
533 | r'\'{3}\s*([\s\S]+)\s*\'{3})' # 3: docstring content. |
||
534 | r'$' |
||
535 | ) |
||
536 | match = docstring_pattern.match(joined_lines) |
||
537 | if match: |
||
538 | result = (match.group(2) or match.group(3)).strip() |
||
539 | return result |
||
540 | |||
541 | |||
542 | def _ensure_set(arg: Union[object, Iterable[object]]) -> Set[object]: |
||
543 | # Make sure that arg is a set. |
||
544 | result = arg or set() |
||
545 | if not isinstance(result, Iterable): |
||
546 | result = {result} |
||
547 | else: |
||
548 | result = set(result) |
||
549 | return result |
||
550 | |||
551 | |||
552 | View Code Duplication | def _discover_list( |
|
553 | what_: List[type], |
||
554 | source: Union[Path, str, Module, Iterable[Module]], |
||
555 | **kwargs: dict) -> List[type]: |
||
556 | args = getattr(what_, '__args__', None) or [Any] |
||
557 | signature = args[0] |
||
558 | if signature in (type, Type) or isinstance(signature, TypeVar): # type: ignore[arg-type] # noqa |
||
559 | signature = Any |
||
560 | kwargs['signature'] = signature |
||
561 | return discover_classes(source, **kwargs) # type: ignore[arg-type] |
||
562 |