Passed
Push — master ( 18b92d...e7c449 )
by Ramon
50s queued 12s
created

barentsz._discover   C

Complexity

Total Complexity 54

Size/Duplication

Total Lines 562
Duplicated Lines 78.47 %

Importance

Changes 0
Metric Value
eloc 271
dl 441
loc 562
rs 6.4799
c 0
b 0
f 0
wmc 54

20 Functions

Rating   Name   Duplication   Size   Complexity  
A discover() 31 36 3
A _create_attribute() 37 37 1
A discover_attributes() 32 32 4
B _discover_attributes_in_lines() 31 31 6
A _discover_packages_per_path() 29 29 3
A _path() 19 19 3
A _discover_elements() 30 30 2
A discover_module_names() 23 24 2
A discover_paths() 20 20 1
A _is_package() 0 11 1
A _find_attribute_docstring() 21 21 3
A _match_attribute() 32 32 2
A _to_package_name() 0 17 2
A discover_packages() 0 13 1
A discover_modules() 28 28 5
A _discover_list() 10 10 3
A _ensure_set() 0 8 2
A discover_functions() 36 36 3
A _get_modules_from_source() 29 29 5
A discover_classes() 33 33 2

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like barentsz._discover often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import glob
2
import inspect
3
import re
4
import sys
5
from importlib import import_module
6
from pathlib import Path
7
from typing import (
8
    Union,
9
    Dict,
10
    List,
11
    Any,
12
    Callable,
13
    Type,
14
    Iterable,
15
    Optional,
16
    Tuple,
17
    Set,
18
    TypeVar,
19
)
20
21
from typish import Module, subclass_of, instance_of
22
23
from barentsz._here import here
24
from barentsz._attribute import Attribute
25
26
27 View Code Duplication
def discover(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
28
        source: Any = None,
29
        *,
30
        what: Any = List[type],
31
        **kwargs: dict,
32
) -> list:
33
    """
34
    Convenience function for discovering types in some source. If not source
35
    is given, the directory is used in which the calling module is located.
36
37
    Args:
38
        source: the source in which is searched or the directory of the
39
        caller if None.
40
        what: the type that is to be discovered.
41
        **kwargs: any keyword argument that is passed on.
42
43
    Returns: a list of discoveries.
44
45
    """
46
    source = source or here(1)
47
48
    delegates = [
49
        (List[type], _discover_list),
50
        (list, _discover_list),
51
        (List, _discover_list),
52
    ]
53
54
    for tuple_ in delegates:
55
        type_, delegate = tuple_
56
        if subclass_of(what, type_):
57
            return delegate(what, source, **kwargs)
58
59
    accepted_types = ', '.join(['`{}`'.format(delegate)
60
                                for delegate, _ in delegates])
61
    raise ValueError('Type `{}` is not supported. This function accepts: '
62
                     '{}'.format(what, accepted_types))
63
64
65 View Code Duplication
def discover_paths(directory: Union[Path, str], pattern: str) -> List[Path]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
66
    """
67
    Return a list of Paths within the given directory that match the given
68
    pattern.
69
70
    Args:
71
        directory: the directory in which is searched for paths.
72
        pattern: a pattern (example: '**/*.py').
73
74
    Returns: a list of Path objects.
75
76
    """
77
    directory_path = _path(directory)
78
    abspath = str(directory_path.absolute())
79
    sys.path.insert(0, abspath)
80
    path_to_discover = directory_path.joinpath(pattern)
81
    result = [Path(filename) for filename in
82
              glob.iglob(str(path_to_discover), recursive=True)]
83
    result.sort()
84
    return result
85
86
87
def discover_packages(directory: Union[Path, str]) -> List[str]:
88
    """
89
    Return a list of packages within the given directory. The directory must be
90
    a package.
91
    Args:
92
        directory: the directory in which is searched for packages.
93
94
    Returns: a list of packages.
95
96
    """
97
    result = list(_discover_packages_per_path(directory).values())
98
    result.sort()
99
    return result
100
101
102 View Code Duplication
def discover_module_names(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
103
        directory: Union[Path, str],
104
        include_privates: bool = False) -> List[str]:
105
    """
106
    Return a list of module names within the given directory. The directory
107
    must be a package and only names are returned of modules that are in
108
    packages.
109
    Args:
110
        directory: the directory in which is searched for modules.
111
        include_privates: if True, privates (unders and dunders) are also
112
        included.
113
114
    Returns: a list of module names (strings).
115
116
    """
117
    result = []
118
    packages_per_path = _discover_packages_per_path(directory)
119
    for path, package_name in packages_per_path.items():
120
        result.extend(['{}.{}'.format(package_name, p.stem)
121
                       for p in discover_paths(path, '*.py')
122
                       if p.stem != '__init__'
123
                       and (include_privates or not p.stem.startswith('_'))])
124
    result.sort()
125
    return result
126
127
128 View Code Duplication
def discover_modules(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
129
        directory: Union[Path, str],
130
        include_privates: bool = False,
131
        raise_on_fail: bool = False) -> List[Module]:
132
    """
133
    Return a list of modules within the given directory. The directory must be
134
    a package and only modules are returned that are in packages.
135
    Args:
136
        directory: the directory in which is searched for modules.
137
        include_privates: if True, privates (unders and dunders) are also
138
        included.
139
        raise_on_fail: if True, an ImportError is raised upon failing to
140
        import any module.
141
142
    Returns: a list of module objects.
143
144
    """
145
    modules = discover_module_names(directory, include_privates)
146
    result = []
147
    for module in modules:
148
        try:
149
            imported_module = import_module(module)
150
            result.append(imported_module)
151
        except Exception as err:
152
            if raise_on_fail:
153
                raise ImportError(err)
154
    result.sort(key=lambda module: module.__name__)
155
    return result
156
157
158 View Code Duplication
def discover_classes(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
159
        source: Union[Path, str, Module, Iterable[Module]],
160
        signature: type = Any,  # type: ignore
161
        include_privates: bool = False,
162
        in_private_modules: bool = False,
163
        raise_on_fail: bool = False,
164
        exclude: Union[Iterable[type], type] = None
165
) -> List[type]:
166
    """
167
    Discover any classes within the given source and according to the given
168
    constraints.
169
170
    Args:
171
        source: the source in which is searched for any classes.
172
        signature: only classes that inherit from signature are returned.
173
        include_privates: if True, private classes are included as well.
174
        in_private_modules: if True, private modules are explored as well.
175
        raise_on_fail: if True, raises an ImportError upon the first import
176
        failure.
177
        exclude: a type or multiple types that are to be excluded from the
178
        result.
179
180
    Returns: a list of all discovered classes (types).
181
182
    """
183
    exclude_ = _ensure_set(exclude)
184
    elements = _discover_elements(source, inspect.isclass, include_privates,
185
                                  in_private_modules, raise_on_fail)
186
    result = list({cls for cls in elements
187
                   if (signature is Any or subclass_of(cls, signature))
188
                   and cls not in exclude_})
189
    result.sort(key=lambda cls: cls.__name__)
190
    return result
191
192
193 View Code Duplication
def discover_functions(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
194
        source: Union[Path, str, Module, Iterable[Module], type],
195
        signature: Type[Callable] = Callable,  # type: ignore
196
        include_privates: bool = False,
197
        in_private_modules: bool = False,
198
        raise_on_fail: bool = False) -> List[type]:
199
    """
200
    Discover any functions within the given source and according to the given
201
    constraints.
202
203
    Args:
204
        source: the source in which is searched for any functions.
205
        signature: only functions that have this signature (parameters and
206
        return type) are included.
207
        include_privates: if True, private functions are included as well.
208
        in_private_modules: if True, private modules are explored as well.
209
        raise_on_fail: if True, raises an ImportError upon the first import
210
        failure.
211
212
    Returns: a list of all discovered functions.
213
214
    """
215
216
    def filter_(*args_: Iterable[Any]) -> bool:
217
        return (inspect.isfunction(*args_)
218
                or inspect.ismethod(*args_))
219
220
    if not isinstance(source, type):
221
        filter_ = inspect.isfunction  # type: ignore
222
223
    elements = _discover_elements(source, filter_, include_privates,
224
                                  in_private_modules, raise_on_fail)
225
    result = [elem for elem in elements
226
              if (signature is Callable or instance_of(elem, signature))]
227
    result.sort(key=lambda func: func.__name__)
228
    return result
229
230
231 View Code Duplication
def discover_attributes(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
232
        source: Union[Path, str, Module, Iterable[Module]],
233
        signature: type = Any,  # type: ignore
234
        include_privates: bool = False,
235
        in_private_modules: bool = False,
236
        raise_on_fail: bool = False) -> List[Attribute]:
237
    """
238
    Discover any attributes within the given source and according to the given
239
    constraints.
240
241
    Args:
242
        source: the source in which is searched for any attributes.
243
        signature: only attributes that are subtypes of this signature are
244
        included.
245
        include_privates: if True, private attributes are included as well.
246
        in_private_modules: if True, private modules are explored as well.
247
        raise_on_fail: if True, raises an ImportError upon the first import
248
        failure.
249
250
    Returns: a list of all discovered attributes.
251
252
    """
253
    modules = _get_modules_from_source(source, in_private_modules,
254
                                       raise_on_fail)
255
    attributes: List[Attribute] = []
256
    for module in modules:
257
        with open(module.__file__) as module_file:
258
            lines = list(module_file)
259
        attributes += _discover_attributes_in_lines(
260
            lines, module, signature, include_privates)
261
    attributes.sort(key=lambda attr: attr.name)
262
    return attributes
263
264
265 View Code Duplication
def _discover_attributes_in_lines(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
266
        lines: List[str],
267
        module: Module,
268
        signature: type,
269
        include_privates: bool) -> List[Attribute]:
270
    """
271
    Discover any attributes within the lines of codee and according to the
272
    given constraints.
273
274
    Args:
275
        lines: the lines of code in which is searched for any attributes.
276
        module: the module from which these lines originate.
277
        signature: only attributes that are subtypes of this signature are
278
        included.
279
        include_privates: if True, private attributes are included as well.
280
281
    Returns: a list of all discovered attributes.
282
283
    """
284
    attributes = []
285
    for index, line in enumerate(lines):
286
        match = _match_attribute(line)
287
        if match:
288
            name, hint, value, comment = match
289
            docstring = _find_attribute_docstring(lines[0:index])
290
            attribute = _create_attribute(name, hint, value, docstring,
291
                                          comment, module, line, index + 1)
292
            if (instance_of(attribute.value, signature)
293
                    and (attribute.is_public or include_privates)):
294
                attributes.append(attribute)
295
    return attributes
296
297
298 View Code Duplication
def _discover_elements(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
299
        source: Union[Path, str, Module, Iterable[Module], type],
300
        filter_: Callable[[Any], bool],
301
        include_privates: bool = False,
302
        in_private_modules: bool = False,
303
        raise_on_fail: bool = False) -> List[Any]:
304
    """
305
    Discover elements (such as attributes or functions) in the given source.
306
    Args:
307
        source: the source that is explored.
308
        filter_: the filter that determines the type of element.
309
        include_privates: if True, private elements are returned as well.
310
        in_private_modules: if True, private modules are examined as well.
311
        raise_on_fail: if True, an ImportError will be raised upon import
312
        failure.
313
314
    Returns: a list of elements.
315
316
    """
317
    if isinstance(source, type):
318
        sources = [source]  # type: Iterable
319
    else:
320
        sources = _get_modules_from_source(source, in_private_modules,
321
                                           raise_on_fail)
322
323
    elements = [elem for src in sources
324
                for _, elem in inspect.getmembers(src, filter_)
325
                if (in_private_modules or not src.__name__.startswith('_'))
326
                and (include_privates or not elem.__name__.startswith('_'))]
327
    return elements
328
329
330 View Code Duplication
def _discover_packages_per_path(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
331
        directory: Union[Path, str]) -> Dict[Path, str]:
332
    """
333
    Discover packages and their original Paths within the given directory.
334
    Args:
335
        directory: the directory in which is searched for modules.
336
337
    Returns: a dict with Paths as keys and strings (the package names) as
338
    values.
339
340
    """
341
    directory_path = _path(directory)
342
    if not directory_path.exists():
343
        raise ValueError('The given directory does not exist. '
344
                         'Given: {}'.format(directory))
345
    if not _is_package(directory_path):
346
        raise ValueError('The given directory must itself be a package. '
347
                         'Given: {}'.format(directory))
348
349
    paths_to_inits = discover_paths(directory_path, '**/__init__.py')
350
    paths = [p.parent for p in paths_to_inits]
351
    packages_per_path = {p: _to_package_name(p) for p in paths}
352
353
    # All packages must have a straight line of packages from the base package.
354
    base_package = _to_package_name(directory_path)
355
    result = {path: package for path, package in packages_per_path.items()
356
              if package.startswith(base_package)}
357
358
    return result
359
360
361 View Code Duplication
def _path(directory: Union[Path, str]) -> Path:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
362
    """
363
    Return a path if directory is a string or return directory if it is a Path
364
    already. Raise a ValueError if it is neither a Path nor a string.
365
366
    Args:
367
        directory: the directory that is a string or Path.
368
369
    Returns: a Path instance.
370
371
    """
372
    if isinstance(directory, Path):
373
        result = directory
374
    elif isinstance(directory, str):
375
        result = Path(directory)
376
    else:
377
        raise ValueError('Invalid type ({}) for directory, provide a Path or '
378
                         'a string.'.format(type(directory)))
379
    return result
380
381
382 View Code Duplication
def _get_modules_from_source(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
383
        source: Union[Path, str, Module, Iterable[Module]],
384
        in_private_modules: bool = False,
385
        raise_on_fail: bool = False
386
) -> Iterable[Module]:
387
    """
388
    Get an iterable of Modules from the given source.
389
    Args:
390
        source: anything that can be turned into an iterable of Modules.
391
        in_private_modules: if True, private modules are explored as well.
392
        raise_on_fail: if True, raises an ImportError upon the first import
393
        failure.
394
395
    Returns: an iterable of Module instances.
396
397
    """
398
    if isinstance(source, Path):
399
        modules = discover_modules(source, in_private_modules, raise_on_fail)
400
    elif isinstance(source, str):
401
        modules = discover_modules(Path(source), in_private_modules,
402
                                   raise_on_fail)
403
    elif isinstance(source, Module):
404
        modules = [source]
405
    elif instance_of(source, Iterable[Module]):
406
        modules = source  # type: ignore
407
    else:
408
        raise ValueError('The given source must be a Path, string or module. '
409
                         'Given: {}'.format(source))
410
    return modules
411
412
413 View Code Duplication
def _match_attribute(line: str) -> Optional[Tuple[str, str, str, str]]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
414
    """
415
    Try to match the given line with an attribute and return the name,
416
    type hint, value and inline comment (respectively) if a match was
417
    found.
418
419
    Args:
420
        line: the line of code that (may) contain an attribute declaration.
421
422
    Returns: a tuple with strings (name, hint, value, comment) or None.
423
424
    """
425
    attr_pattern = re.compile(
426
        r'^'
427
        r'\s*'
428
        r'([a-zA-Z_]+[a-zA-Z_0-9]*)'  # 1: Name.
429
        r'(\s*:\s*(\w+)\s*)?'  # 3: Type hint.
430
        r'\s*=\s*'
431
        r'(.+?)'  # 4: Value.
432
        r'\s*'
433
        r'(#\s*(.*?)\s*)?'  # 6: Inline comment.
434
        r'$'
435
    )
436
    match = attr_pattern.match(line)
437
    result = None
438
    if match:
439
        attr_name = match.group(1)
440
        hint = match.group(3)
441
        attr_value = match.group(4)
442
        inline_comments = match.group(6)
443
        result = attr_name, hint, attr_value, inline_comments
444
    return result
445
446
447 View Code Duplication
def _create_attribute(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
448
        name: str,
449
        hint: Optional[str],
450
        assigned_value: str,
451
        docstring: Optional[str],
452
        comment: Optional[str],
453
        module: Module,
454
        line: str,
455
        line_nr: int) -> Attribute:
456
    """
457
    Create and return an Attribute instance from the given parameters.
458
    Args:
459
        name: the name of the attribute.
460
        hint: the type hint of the attribute (if any).
461
        assigned_value: the string that was literally assigned.
462
        docstring: the docstring above this attribute.
463
        comment: an inline comment (if any).
464
        module: the module that contains the attribute.
465
        line: the line that defines the attribute.
466
        line_nr: the line number of the attribute.
467
468
    Returns: an Attribute instance.
469
470
    """
471
    value = getattr(module, name)
472
    type_ = type(value)
473
    return Attribute(
474
        name=name,
475
        type_=type_,
476
        value=value,
477
        doc=docstring,
478
        comment=comment,
479
        hint=hint,
480
        module=module,
481
        assigned_value=assigned_value,
482
        line=line,
483
        line_nr=line_nr
484
    )
485
486
487
def _is_package(directory: Path) -> bool:
488
    """
489
    Return True if the given directory is a package and False otherwise.
490
    Args:
491
        directory: the directory to check.
492
493
    Returns: True if directory is a package.
494
495
    """
496
    paths = discover_paths(directory, '__init__.py')
497
    return len(paths) > 0
498
499
500
def _to_package_name(directory: Path) -> str:
501
    """
502
    Translate the given directory to a package (str). Check every parent
503
    directory in the tree to find the complete fully qualified package name.
504
    Args:
505
        directory: the directory that is to become a package name.
506
507
    Returns: a package name as string.
508
509
    """
510
    parts: List[str] = []
511
    current_dir = directory
512
    while _is_package(current_dir):
513
        # See how far up the tree we can go while still in a package.
514
        parts.insert(0, current_dir.stem)
515
        current_dir = current_dir.parent
516
    return '.'.join(parts)
517
518
519 View Code Duplication
def _find_attribute_docstring(lines: List[str]) -> Optional[str]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
520
    """
521
    Find any docstring that is right above an attribute.
522
    Args:
523
        lines: the lines of code that may contain a docstring.
524
525
    Returns: a docstring (str) or None.
526
527
    """
528
    result = None
529
    if lines:
530
        joined_lines = ''.join(lines).strip()
531
        docstring_pattern = re.compile(
532
            r'("{3}\s*([\s\S]+)\s*"{3}|'  # 2: docstring content.
533
            r'\'{3}\s*([\s\S]+)\s*\'{3})'  # 3: docstring content.
534
            r'$'
535
        )
536
        match = docstring_pattern.match(joined_lines)
537
        if match:
538
            result = (match.group(2) or match.group(3)).strip()
539
    return result
540
541
542
def _ensure_set(arg: Union[object, Iterable[object]]) -> Set[object]:
543
    # Make sure that arg is a set.
544
    result = arg or set()
545
    if not isinstance(result, Iterable):
546
        result = {result}
547
    else:
548
        result = set(result)
549
    return result
550
551
552 View Code Duplication
def _discover_list(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
553
        what_: List[type],
554
        source: Union[Path, str, Module, Iterable[Module]],
555
        **kwargs: dict) -> List[type]:
556
    args = getattr(what_, '__args__', None) or [Any]
557
    signature = args[0]
558
    if signature in (type, Type) or isinstance(signature, TypeVar):  # type: ignore[arg-type] # noqa
559
        signature = Any
560
    kwargs['signature'] = signature
561
    return discover_classes(source, **kwargs)  # type: ignore[arg-type]
562