Passed
Pull Request — master (#1)
by Ramon
01:31
created

barentsz._discover   B

Complexity

Total Complexity 51

Size/Duplication

Total Lines 553
Duplicated Lines 78.48 %

Importance

Changes 0
Metric Value
eloc 262
dl 434
loc 553
rs 7.92
c 0
b 0
f 0
wmc 51

20 Functions

Rating   Name   Duplication   Size   Complexity  
A _create_attribute() 37 37 1
A discover_attributes() 31 31 3
B _discover_attributes_in_lines() 31 31 6
A _discover_packages_per_path() 29 29 3
A _path() 19 19 3
A _discover_elements() 30 30 2
A discover_module_names() 23 23 2
A discover_paths() 18 18 1
A discover() 31 36 3
A _is_package() 0 11 1
A _find_attribute_docstring() 21 21 3
A _match_attribute() 32 32 2
A _to_package_name() 0 17 2
A discover_packages() 0 11 1
A discover_modules() 27 27 4
A _discover_list() 10 10 3
A _ensure_set() 0 8 2
A discover_functions() 33 34 2
A _get_modules_from_source() 29 29 5
A discover_classes() 33 33 2

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like barentsz._discover often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import glob
2
import inspect
3
import re
4
import sys
5
from importlib import import_module
6
from pathlib import Path
7
from typing import (
8
    Union,
9
    Dict,
10
    List,
11
    Any,
12
    Callable,
13
    Type,
14
    Iterable,
15
    Optional,
16
    Tuple,
17
    Set,
18
    TypeVar,
19
)
20
21
from typish import Module, subclass_of, instance_of
22
23
from barentsz._here import here
24
from barentsz._attribute import Attribute
25
26
27 View Code Duplication
def discover(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
28
        source: Any = None,
29
        *,
30
        what: Any = List[type],
31
        **kwargs: dict,
32
) -> list:
33
    """
34
    Convenience function for discovering types in some source. If not source
35
    is given, the directory is used in which the calling module is located.
36
37
    Args:
38
        source: the source in which is searched or the directory of the
39
        caller if None.
40
        what: the type that is to be discovered.
41
        **kwargs: any keyword argument that is passed on.
42
43
    Returns: a list of discoveries.
44
45
    """
46
    source = source or here(1)
47
48
    delegates = [
49
        (List[type], _discover_list),
50
        (list, _discover_list),
51
        (List, _discover_list),
52
    ]
53
54
    for tuple_ in delegates:
55
        type_, delegate = tuple_
56
        if subclass_of(what, type_):
57
            return delegate(what, source, **kwargs)
58
59
    accepted_types = ', '.join(['`{}`'.format(delegate)
60
                                for delegate, _ in delegates])
61
    raise ValueError('Type `{}` is not supported. This function accepts: '
62
                     '{}'.format(what, accepted_types))
63
64
65 View Code Duplication
def discover_paths(directory: Union[Path, str], pattern: str) -> List[Path]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
66
    """
67
    Return a list of Paths within the given directory that match the given
68
    pattern.
69
70
    Args:
71
        directory: the directory in which is searched for paths.
72
        pattern: a pattern (example: '**/*.py').
73
74
    Returns: a list of Path objects.
75
76
    """
77
    directory_path = _path(directory)
78
    abspath = str(directory_path.absolute())
79
    sys.path.insert(0, abspath)
80
    path_to_discover = directory_path.joinpath(pattern)
81
    return [Path(filename) for filename in
82
            glob.iglob(str(path_to_discover), recursive=True)]
83
84
85
def discover_packages(directory: Union[Path, str]) -> List[str]:
86
    """
87
    Return a list of packages within the given directory. The directory must be
88
    a package.
89
    Args:
90
        directory: the directory in which is searched for packages.
91
92
    Returns: a list of packages.
93
94
    """
95
    return list(_discover_packages_per_path(directory).values())
96
97
98 View Code Duplication
def discover_module_names(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
99
        directory: Union[Path, str],
100
        include_privates: bool = False) -> List[str]:
101
    """
102
    Return a list of module names within the given directory. The directory
103
    must be a package and only names are returned of modules that are in
104
    packages.
105
    Args:
106
        directory: the directory in which is searched for modules.
107
        include_privates: if True, privates (unders and dunders) are also
108
        included.
109
110
    Returns: a list of module names (strings).
111
112
    """
113
    result = []
114
    packages_per_path = _discover_packages_per_path(directory)
115
    for path, package_name in packages_per_path.items():
116
        result.extend(['{}.{}'.format(package_name, p.stem)
117
                       for p in discover_paths(path, '*.py')
118
                       if p.stem != '__init__'
119
                       and (include_privates or not p.stem.startswith('_'))])
120
    return result
121
122
123 View Code Duplication
def discover_modules(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
124
        directory: Union[Path, str],
125
        include_privates: bool = False,
126
        raise_on_fail: bool = False) -> List[Module]:
127
    """
128
    Return a list of modules within the given directory. The directory must be
129
    a package and only modules are returned that are in packages.
130
    Args:
131
        directory: the directory in which is searched for modules.
132
        include_privates: if True, privates (unders and dunders) are also
133
        included.
134
        raise_on_fail: if True, an ImportError is raised upon failing to
135
        import any module.
136
137
    Returns: a list of module objects.
138
139
    """
140
    modules = discover_module_names(directory, include_privates)
141
    result = []
142
    for module in modules:
143
        try:
144
            imported_module = import_module(module)
145
            result.append(imported_module)
146
        except Exception as err:
147
            if raise_on_fail:
148
                raise ImportError(err)
149
    return result
150
151
152 View Code Duplication
def discover_classes(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
153
        source: Union[Path, str, Module, Iterable[Module]],
154
        signature: type = Any,  # type: ignore
155
        include_privates: bool = False,
156
        in_private_modules: bool = False,
157
        raise_on_fail: bool = False,
158
        exclude: Union[Iterable[type], type] = None
159
) -> List[type]:
160
    """
161
    Discover any classes within the given source and according to the given
162
    constraints.
163
164
    Args:
165
        source: the source in which is searched for any classes.
166
        signature: only classes that inherit from signature are returned.
167
        include_privates: if True, private classes are included as well.
168
        in_private_modules: if True, private modules are explored as well.
169
        raise_on_fail: if True, raises an ImportError upon the first import
170
        failure.
171
        exclude: a type or multiple types that are to be excluded from the
172
        result.
173
174
    Returns: a list of all discovered classes (types).
175
176
    """
177
    exclude_ = _ensure_set(exclude)
178
    elements = _discover_elements(source, inspect.isclass, include_privates,
179
                                  in_private_modules, raise_on_fail)
180
    result = list({cls for cls in elements
181
                   if (signature is Any or subclass_of(cls, signature))
182
                   and cls not in exclude_})
183
    result.sort(key=lambda cls: cls.__name__)
184
    return result
185
186
187 View Code Duplication
def discover_functions(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
188
        source: Union[Path, str, Module, Iterable[Module], type],
189
        signature: Type[Callable] = Callable,  # type: ignore
190
        include_privates: bool = False,
191
        in_private_modules: bool = False,
192
        raise_on_fail: bool = False) -> List[type]:
193
    """
194
    Discover any functions within the given source and according to the given
195
    constraints.
196
197
    Args:
198
        source: the source in which is searched for any functions.
199
        signature: only functions that have this signature (parameters and
200
        return type) are included.
201
        include_privates: if True, private functions are included as well.
202
        in_private_modules: if True, private modules are explored as well.
203
        raise_on_fail: if True, raises an ImportError upon the first import
204
        failure.
205
206
    Returns: a list of all discovered functions.
207
208
    """
209
210
    def filter_(*args_: Iterable[Any]) -> bool:
211
        return (inspect.isfunction(*args_)
212
                or inspect.ismethod(*args_))
213
214
    if not isinstance(source, type):
215
        filter_ = inspect.isfunction  # type: ignore
216
217
    elements = _discover_elements(source, filter_, include_privates,
218
                                  in_private_modules, raise_on_fail)
219
    return [elem for elem in elements
220
            if (signature is Callable or instance_of(elem, signature))]
221
222
223 View Code Duplication
def discover_attributes(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
224
        source: Union[Path, str, Module, Iterable[Module]],
225
        signature: type = Any,  # type: ignore
226
        include_privates: bool = False,
227
        in_private_modules: bool = False,
228
        raise_on_fail: bool = False) -> List[Attribute]:
229
    """
230
    Discover any attributes within the given source and according to the given
231
    constraints.
232
233
    Args:
234
        source: the source in which is searched for any attributes.
235
        signature: only attributes that are subtypes of this signature are
236
        included.
237
        include_privates: if True, private attributes are included as well.
238
        in_private_modules: if True, private modules are explored as well.
239
        raise_on_fail: if True, raises an ImportError upon the first import
240
        failure.
241
242
    Returns: a list of all discovered attributes.
243
244
    """
245
    modules = _get_modules_from_source(source, in_private_modules,
246
                                       raise_on_fail)
247
    attributes: List[Attribute] = []
248
    for module in modules:
249
        with open(module.__file__) as module_file:
250
            lines = list(module_file)
251
        attributes += _discover_attributes_in_lines(
252
            lines, module, signature, include_privates)
253
    return attributes
254
255
256 View Code Duplication
def _discover_attributes_in_lines(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
257
        lines: List[str],
258
        module: Module,
259
        signature: type,
260
        include_privates: bool) -> List[Attribute]:
261
    """
262
    Discover any attributes within the lines of codee and according to the
263
    given constraints.
264
265
    Args:
266
        lines: the lines of code in which is searched for any attributes.
267
        module: the module from which these lines originate.
268
        signature: only attributes that are subtypes of this signature are
269
        included.
270
        include_privates: if True, private attributes are included as well.
271
272
    Returns: a list of all discovered attributes.
273
274
    """
275
    attributes = []
276
    for index, line in enumerate(lines):
277
        match = _match_attribute(line)
278
        if match:
279
            name, hint, value, comment = match
280
            docstring = _find_attribute_docstring(lines[0:index])
281
            attribute = _create_attribute(name, hint, value, docstring,
282
                                          comment, module, line, index + 1)
283
            if (instance_of(attribute.value, signature)
284
                    and (attribute.is_public or include_privates)):
285
                attributes.append(attribute)
286
    return attributes
287
288
289 View Code Duplication
def _discover_elements(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
290
        source: Union[Path, str, Module, Iterable[Module], type],
291
        filter_: Callable[[Any], bool],
292
        include_privates: bool = False,
293
        in_private_modules: bool = False,
294
        raise_on_fail: bool = False) -> List[Any]:
295
    """
296
    Discover elements (such as attributes or functions) in the given source.
297
    Args:
298
        source: the source that is explored.
299
        filter_: the filter that determines the type of element.
300
        include_privates: if True, private elements are returned as well.
301
        in_private_modules: if True, private modules are examined as well.
302
        raise_on_fail: if True, an ImportError will be raised upon import
303
        failure.
304
305
    Returns: a list of elements.
306
307
    """
308
    if isinstance(source, type):
309
        sources = [source]  # type: Iterable
310
    else:
311
        sources = _get_modules_from_source(source, in_private_modules,
312
                                           raise_on_fail)
313
314
    elements = [elem for src in sources
315
                for _, elem in inspect.getmembers(src, filter_)
316
                if (in_private_modules or not src.__name__.startswith('_'))
317
                and (include_privates or not elem.__name__.startswith('_'))]
318
    return elements
319
320
321 View Code Duplication
def _discover_packages_per_path(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
322
        directory: Union[Path, str]) -> Dict[Path, str]:
323
    """
324
    Discover packages and their original Paths within the given directory.
325
    Args:
326
        directory: the directory in which is searched for modules.
327
328
    Returns: a dict with Paths as keys and strings (the package names) as
329
    values.
330
331
    """
332
    directory_path = _path(directory)
333
    if not directory_path.exists():
334
        raise ValueError('The given directory does not exist. '
335
                         'Given: {}'.format(directory))
336
    if not _is_package(directory_path):
337
        raise ValueError('The given directory must itself be a package. '
338
                         'Given: {}'.format(directory))
339
340
    paths_to_inits = discover_paths(directory_path, '**/__init__.py')
341
    paths = [p.parent for p in paths_to_inits]
342
    packages_per_path = {p: _to_package_name(p) for p in paths}
343
344
    # All packages must have a straight line of packages from the base package.
345
    base_package = _to_package_name(directory_path)
346
    result = {path: package for path, package in packages_per_path.items()
347
              if package.startswith(base_package)}
348
349
    return result
350
351
352 View Code Duplication
def _path(directory: Union[Path, str]) -> Path:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
353
    """
354
    Return a path if directory is a string or return directory if it is a Path
355
    already. Raise a ValueError if it is neither a Path nor a string.
356
357
    Args:
358
        directory: the directory that is a string or Path.
359
360
    Returns: a Path instance.
361
362
    """
363
    if isinstance(directory, Path):
364
        result = directory
365
    elif isinstance(directory, str):
366
        result = Path(directory)
367
    else:
368
        raise ValueError('Invalid type ({}) for directory, provide a Path or '
369
                         'a string.'.format(type(directory)))
370
    return result
371
372
373 View Code Duplication
def _get_modules_from_source(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
374
        source: Union[Path, str, Module, Iterable[Module]],
375
        in_private_modules: bool = False,
376
        raise_on_fail: bool = False
377
) -> Iterable[Module]:
378
    """
379
    Get an iterable of Modules from the given source.
380
    Args:
381
        source: anything that can be turned into an iterable of Modules.
382
        in_private_modules: if True, private modules are explored as well.
383
        raise_on_fail: if True, raises an ImportError upon the first import
384
        failure.
385
386
    Returns: an iterable of Module instances.
387
388
    """
389
    if isinstance(source, Path):
390
        modules = discover_modules(source, in_private_modules, raise_on_fail)
391
    elif isinstance(source, str):
392
        modules = discover_modules(Path(source), in_private_modules,
393
                                   raise_on_fail)
394
    elif isinstance(source, Module):
395
        modules = [source]
396
    elif instance_of(source, Iterable[Module]):
397
        modules = source  # type: ignore
398
    else:
399
        raise ValueError('The given source must be a Path, string or module. '
400
                         'Given: {}'.format(source))
401
    return modules
402
403
404 View Code Duplication
def _match_attribute(line: str) -> Optional[Tuple[str, str, str, str]]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
405
    """
406
    Try to match the given line with an attribute and return the name,
407
    type hint, value and inline comment (respectively) if a match was
408
    found.
409
410
    Args:
411
        line: the line of code that (may) contain an attribute declaration.
412
413
    Returns: a tuple with strings (name, hint, value, comment) or None.
414
415
    """
416
    attr_pattern = re.compile(
417
        r'^'
418
        r'\s*'
419
        r'([a-zA-Z_]+[a-zA-Z_0-9]*)'  # 1: Name.
420
        r'(\s*:\s*(\w+)\s*)?'  # 3: Type hint.
421
        r'\s*=\s*'
422
        r'(.+?)'  # 4: Value.
423
        r'\s*'
424
        r'(#\s*(.*?)\s*)?'  # 6: Inline comment.
425
        r'$'
426
    )
427
    match = attr_pattern.match(line)
428
    result = None
429
    if match:
430
        attr_name = match.group(1)
431
        hint = match.group(3)
432
        attr_value = match.group(4)
433
        inline_comments = match.group(6)
434
        result = attr_name, hint, attr_value, inline_comments
435
    return result
436
437
438 View Code Duplication
def _create_attribute(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
439
        name: str,
440
        hint: Optional[str],
441
        assigned_value: str,
442
        docstring: Optional[str],
443
        comment: Optional[str],
444
        module: Module,
445
        line: str,
446
        line_nr: int) -> Attribute:
447
    """
448
    Create and return an Attribute instance from the given parameters.
449
    Args:
450
        name: the name of the attribute.
451
        hint: the type hint of the attribute (if any).
452
        assigned_value: the string that was literally assigned.
453
        docstring: the docstring above this attribute.
454
        comment: an inline comment (if any).
455
        module: the module that contains the attribute.
456
        line: the line that defines the attribute.
457
        line_nr: the line number of the attribute.
458
459
    Returns: an Attribute instance.
460
461
    """
462
    value = getattr(module, name)
463
    type_ = type(value)
464
    return Attribute(
465
        name=name,
466
        type_=type_,
467
        value=value,
468
        doc=docstring,
469
        comment=comment,
470
        hint=hint,
471
        module=module,
472
        assigned_value=assigned_value,
473
        line=line,
474
        line_nr=line_nr
475
    )
476
477
478
def _is_package(directory: Path) -> bool:
479
    """
480
    Return True if the given directory is a package and False otherwise.
481
    Args:
482
        directory: the directory to check.
483
484
    Returns: True if directory is a package.
485
486
    """
487
    paths = discover_paths(directory, '__init__.py')
488
    return len(paths) > 0
489
490
491
def _to_package_name(directory: Path) -> str:
492
    """
493
    Translate the given directory to a package (str). Check every parent
494
    directory in the tree to find the complete fully qualified package name.
495
    Args:
496
        directory: the directory that is to become a package name.
497
498
    Returns: a package name as string.
499
500
    """
501
    parts: List[str] = []
502
    current_dir = directory
503
    while _is_package(current_dir):
504
        # See how far up the tree we can go while still in a package.
505
        parts.insert(0, current_dir.stem)
506
        current_dir = current_dir.parent
507
    return '.'.join(parts)
508
509
510 View Code Duplication
def _find_attribute_docstring(lines: List[str]) -> Optional[str]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
511
    """
512
    Find any docstring that is right above an attribute.
513
    Args:
514
        lines: the lines of code that may contain a docstring.
515
516
    Returns: a docstring (str) or None.
517
518
    """
519
    result = None
520
    if lines:
521
        joined_lines = ''.join(lines).strip()
522
        docstring_pattern = re.compile(
523
            r'("{3}\s*([\s\S]+)\s*"{3}|'  # 2: docstring content.
524
            r'\'{3}\s*([\s\S]+)\s*\'{3})'  # 3: docstring content.
525
            r'$'
526
        )
527
        match = docstring_pattern.match(joined_lines)
528
        if match:
529
            result = (match.group(2) or match.group(3)).strip()
530
    return result
531
532
533
def _ensure_set(arg: Union[object, Iterable[object]]) -> Set[object]:
534
    # Make sure that arg is a set.
535
    result = arg or set()
536
    if not isinstance(result, Iterable):
537
        result = {result}
538
    else:
539
        result = set(result)
540
    return result
541
542
543 View Code Duplication
def _discover_list(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
544
        what_: List[type],
545
        source: Union[Path, str, Module, Iterable[Module]],
546
        **kwargs: dict) -> List[type]:
547
    args = getattr(what_, '__args__', None) or [Any]
548
    signature = args[0]
549
    if signature in (type, Type) or isinstance(signature, TypeVar):  # type: ignore[arg-type] # noqa
550
        signature = Any
551
    kwargs['signature'] = signature
552
    return discover_classes(source, **kwargs)
553