Passed
Pull Request — master (#1)
by Ramon
01:44 queued 11s
created

barentsz._discover._ensure_set()   A

Complexity

Conditions 2

Size

Total Lines 8
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 6
nop 1
dl 0
loc 8
rs 10
c 0
b 0
f 0
1
import glob
2
import inspect
3
import re
4
import sys
5
from importlib import import_module
6
from pathlib import Path
7
from typing import (
8
    Union,
9
    Dict,
10
    List,
11
    Any,
12
    Callable,
13
    Type,
14
    Iterable,
15
    Optional,
16
    Tuple,
17
    Set,
18
    TypeVar,
19
)
20
21
from typish import Module, subclass_of, instance_of
22
23
from barentsz._here import here
24
from barentsz._attribute import Attribute
25
26
27 View Code Duplication
def discover(source: Any = None, *, what: Any = List[type], **kwargs) -> list:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
28
    """
29
    Convenience function for discovering types in some source. If not source
30
    is given, the directory is used in which the calling module is located.
31
32
    Args:
33
        source: the source in which is searched or the directory of the
34
        caller if None.
35
        what: the type that is to be discovered.
36
        **kwargs: any keyword argument that is passed on.
37
38
    Returns: a list of discoveries.
39
40
    """
41
    source = source or here(1)
42
43
    delegates = [
44
        (List[type], _discover_list),
45
        (list, _discover_list),
46
        (List, _discover_list),
47
    ]
48
49
    for tuple_ in delegates:
50
        type_, delegate = tuple_
51
        if subclass_of(what, type_):
52
            return delegate(what, source, **kwargs)
53
    else:
54
        accepted_types = ', '.join(['`{}`'.format(delegate)
55
                                    for delegate, _ in delegates])
56
        raise ValueError('Type `{}` is not supported. This function accepts: '
57
                         '{}'.format(what, accepted_types))
58
59
60 View Code Duplication
def discover_paths(directory: Union[Path, str], pattern: str) -> List[Path]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
61
    """
62
    Return a list of Paths within the given directory that match the given
63
    pattern.
64
65
    Args:
66
        directory: the directory in which is searched for paths.
67
        pattern: a pattern (example: '**/*.py').
68
69
    Returns: a list of Path objects.
70
71
    """
72
    directory_path = _path(directory)
73
    abspath = str(directory_path.absolute())
74
    sys.path.insert(0, abspath)
75
    path_to_discover = directory_path.joinpath(pattern)
76
    return [Path(filename) for filename in
77
            glob.iglob(str(path_to_discover), recursive=True)]
78
79
80
def discover_packages(directory: Union[Path, str]) -> List[str]:
81
    """
82
    Return a list of packages within the given directory. The directory must be
83
    a package.
84
    Args:
85
        directory: the directory in which is searched for packages.
86
87
    Returns: a list of packages.
88
89
    """
90
    return list(_discover_packages_per_path(directory).values())
91
92
93 View Code Duplication
def discover_module_names(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
94
        directory: Union[Path, str],
95
        include_privates: bool = False) -> List[str]:
96
    """
97
    Return a list of module names within the given directory. The directory
98
    must be a package and only names are returned of modules that are in
99
    packages.
100
    Args:
101
        directory: the directory in which is searched for modules.
102
        include_privates: if True, privates (unders and dunders) are also
103
        included.
104
105
    Returns: a list of module names (strings).
106
107
    """
108
    result = []
109
    packages_per_path = _discover_packages_per_path(directory)
110
    for path, package_name in packages_per_path.items():
111
        result.extend(['{}.{}'.format(package_name, p.stem)
112
                       for p in discover_paths(path, '*.py')
113
                       if p.stem != '__init__'
114
                       and (include_privates or not p.stem.startswith('_'))])
115
    return result
116
117
118 View Code Duplication
def discover_modules(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
119
        directory: Union[Path, str],
120
        include_privates: bool = False,
121
        raise_on_fail: bool = False) -> List[Module]:
122
    """
123
    Return a list of modules within the given directory. The directory must be
124
    a package and only modules are returned that are in packages.
125
    Args:
126
        directory: the directory in which is searched for modules.
127
        include_privates: if True, privates (unders and dunders) are also
128
        included.
129
        raise_on_fail: if True, an ImportError is raised upon failing to
130
        import any module.
131
132
    Returns: a list of module objects.
133
134
    """
135
    modules = discover_module_names(directory, include_privates)
136
    result = []
137
    for module in modules:
138
        try:
139
            imported_module = import_module(module)
140
            result.append(imported_module)
141
        except Exception as err:
142
            if raise_on_fail:
143
                raise ImportError(err)
144
    return result
145
146
147 View Code Duplication
def discover_classes(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
148
        source: Union[Path, str, Module, Iterable[Module]],
149
        signature: type = Any,  # type: ignore
150
        include_privates: bool = False,
151
        in_private_modules: bool = False,
152
        raise_on_fail: bool = False,
153
        exclude: Union[Iterable[type], type] = None
154
) -> List[type]:
155
    """
156
    Discover any classes within the given source and according to the given
157
    constraints.
158
159
    Args:
160
        source: the source in which is searched for any classes.
161
        signature: only classes that inherit from signature are returned.
162
        include_privates: if True, private classes are included as well.
163
        in_private_modules: if True, private modules are explored as well.
164
        raise_on_fail: if True, raises an ImportError upon the first import
165
        failure.
166
        exclude: a type or multiple types that are to be excluded from the
167
        result.
168
169
    Returns: a list of all discovered classes (types).
170
171
    """
172
    exclude = _ensure_set(exclude)
173
    elements = _discover_elements(source, inspect.isclass, include_privates,
174
                                  in_private_modules, raise_on_fail)
175
    result = list({cls for cls in elements
176
                   if (signature is Any or subclass_of(cls, signature))
177
                   and cls not in exclude})
178
    result.sort(key=lambda cls: cls.__name__)
179
    return result
180
181
182 View Code Duplication
def discover_functions(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
183
        source: Union[Path, str, Module, Iterable[Module], type],
184
        signature: Type[Callable] = Callable,  # type: ignore
185
        include_privates: bool = False,
186
        in_private_modules: bool = False,
187
        raise_on_fail: bool = False) -> List[type]:
188
    """
189
    Discover any functions within the given source and according to the given
190
    constraints.
191
192
    Args:
193
        source: the source in which is searched for any functions.
194
        signature: only functions that have this signature (parameters and
195
        return type) are included.
196
        include_privates: if True, private functions are included as well.
197
        in_private_modules: if True, private modules are explored as well.
198
        raise_on_fail: if True, raises an ImportError upon the first import
199
        failure.
200
201
    Returns: a list of all discovered functions.
202
203
    """
204
205
    def filter_(*args_: Iterable[Any]) -> bool:
206
        return (inspect.isfunction(*args_)
207
                or inspect.ismethod(*args_))
208
209
    if not isinstance(source, type):
210
        filter_ = inspect.isfunction  # type: ignore
211
212
    elements = _discover_elements(source, filter_, include_privates,
213
                                  in_private_modules, raise_on_fail)
214
    return [elem for elem in elements
215
            if (signature is Callable or instance_of(elem, signature))]
216
217
218 View Code Duplication
def discover_attributes(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
219
        source: Union[Path, str, Module, Iterable[Module]],
220
        signature: type = Any,  # type: ignore
221
        include_privates: bool = False,
222
        in_private_modules: bool = False,
223
        raise_on_fail: bool = False) -> List[Attribute]:
224
    """
225
    Discover any attributes within the given source and according to the given
226
    constraints.
227
228
    Args:
229
        source: the source in which is searched for any attributes.
230
        signature: only attributes that are subtypes of this signature are
231
        included.
232
        include_privates: if True, private attributes are included as well.
233
        in_private_modules: if True, private modules are explored as well.
234
        raise_on_fail: if True, raises an ImportError upon the first import
235
        failure.
236
237
    Returns: a list of all discovered attributes.
238
239
    """
240
    modules = _get_modules_from_source(source, in_private_modules,
241
                                       raise_on_fail)
242
    attributes: List[Attribute] = []
243
    for module in modules:
244
        with open(module.__file__) as module_file:
245
            lines = list(module_file)
246
        attributes += _discover_attributes_in_lines(
247
            lines, module, signature, include_privates)
248
    return attributes
249
250
251 View Code Duplication
def _discover_attributes_in_lines(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
252
        lines: List[str],
253
        module: Module,
254
        signature: type,
255
        include_privates: bool) -> List[Attribute]:
256
    """
257
    Discover any attributes within the lines of codee and according to the
258
    given constraints.
259
260
    Args:
261
        lines: the lines of code in which is searched for any attributes.
262
        module: the module from which these lines originate.
263
        signature: only attributes that are subtypes of this signature are
264
        included.
265
        include_privates: if True, private attributes are included as well.
266
267
    Returns: a list of all discovered attributes.
268
269
    """
270
    attributes = []
271
    for index, line in enumerate(lines):
272
        match = _match_attribute(line)
273
        if match:
274
            name, hint, value, comment = match
275
            docstring = _find_attribute_docstring(lines[0:index])
276
            attribute = _create_attribute(name, hint, value, docstring,
277
                                          comment, module, line, index + 1)
278
            if (instance_of(attribute.value, signature)
279
                    and (attribute.is_public or include_privates)):
280
                attributes.append(attribute)
281
    return attributes
282
283
284 View Code Duplication
def _discover_elements(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
285
        source: Union[Path, str, Module, Iterable[Module], type],
286
        filter_: Callable[[Any], bool],
287
        include_privates: bool = False,
288
        in_private_modules: bool = False,
289
        raise_on_fail: bool = False) -> List[Any]:
290
    """
291
    Discover elements (such as attributes or functions) in the given source.
292
    Args:
293
        source: the source that is explored.
294
        filter_: the filter that determines the type of element.
295
        include_privates: if True, private elements are returned as well.
296
        in_private_modules: if True, private modules are examined as well.
297
        raise_on_fail: if True, an ImportError will be raised upon import
298
        failure.
299
300
    Returns: a list of elements.
301
302
    """
303
    if isinstance(source, type):
304
        sources = [source]  # type: Iterable
305
    else:
306
        sources = _get_modules_from_source(source, in_private_modules,
307
                                           raise_on_fail)
308
309
    elements = [elem for src in sources
310
                for _, elem in inspect.getmembers(src, filter_)
311
                if (in_private_modules or not src.__name__.startswith('_'))
312
                and (include_privates or not elem.__name__.startswith('_'))]
313
    return elements
314
315
316 View Code Duplication
def _discover_packages_per_path(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
317
        directory: Union[Path, str]) -> Dict[Path, str]:
318
    """
319
    Discover packages and their original Paths within the given directory.
320
    Args:
321
        directory: the directory in which is searched for modules.
322
323
    Returns: a dict with Paths as keys and strings (the package names) as
324
    values.
325
326
    """
327
    directory_path = _path(directory)
328
    if not directory_path.exists():
329
        raise ValueError('The given directory does not exist. '
330
                         'Given: {}'.format(directory))
331
    if not _is_package(directory_path):
332
        raise ValueError('The given directory must itself be a package. '
333
                         'Given: {}'.format(directory))
334
335
    paths_to_inits = discover_paths(directory_path, '**/__init__.py')
336
    paths = [p.parent for p in paths_to_inits]
337
    packages_per_path = {p: _to_package_name(p) for p in paths}
338
339
    # All packages must have a straight line of packages from the base package.
340
    base_package = _to_package_name(directory_path)
341
    result = {path: package for path, package in packages_per_path.items()
342
              if package.startswith(base_package)}
343
344
    return result
345
346
347 View Code Duplication
def _path(directory: Union[Path, str]) -> Path:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
348
    """
349
    Return a path if directory is a string or return directory if it is a Path
350
    already. Raise a ValueError if it is neither a Path nor a string.
351
352
    Args:
353
        directory: the directory that is a string or Path.
354
355
    Returns: a Path instance.
356
357
    """
358
    if isinstance(directory, Path):
359
        result = directory
360
    elif isinstance(directory, str):
361
        result = Path(directory)
362
    else:
363
        raise ValueError('Invalid type ({}) for directory, provide a Path or '
364
                         'a string.'.format(type(directory)))
365
    return result
366
367
368 View Code Duplication
def _get_modules_from_source(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
369
        source: Union[Path, str, Module, Iterable[Module]],
370
        in_private_modules: bool = False,
371
        raise_on_fail: bool = False
372
) -> Iterable[Module]:
373
    """
374
    Get an iterable of Modules from the given source.
375
    Args:
376
        source: anything that can be turned into an iterable of Modules.
377
        in_private_modules: if True, private modules are explored as well.
378
        raise_on_fail: if True, raises an ImportError upon the first import
379
        failure.
380
381
    Returns: an iterable of Module instances.
382
383
    """
384
    if isinstance(source, Path):
385
        modules = discover_modules(source, in_private_modules, raise_on_fail)
386
    elif isinstance(source, str):
387
        modules = discover_modules(Path(source), in_private_modules,
388
                                   raise_on_fail)
389
    elif isinstance(source, Module):
390
        modules = [source]
391
    elif instance_of(source, Iterable[Module]):
392
        modules = source  # type: ignore
393
    else:
394
        raise ValueError('The given source must be a Path, string or module. '
395
                         'Given: {}'.format(source))
396
    return modules
397
398
399 View Code Duplication
def _match_attribute(line: str) -> Optional[Tuple[str, str, str, str]]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
400
    """
401
    Try to match the given line with an attribute and return the name,
402
    type hint, value and inline comment (respectively) if a match was
403
    found.
404
405
    Args:
406
        line: the line of code that (may) contain an attribute declaration.
407
408
    Returns: a tuple with strings (name, hint, value, comment) or None.
409
410
    """
411
    attr_pattern = re.compile(
412
        r'^'
413
        r'\s*'
414
        r'([a-zA-Z_]+[a-zA-Z_0-9]*)'  # 1: Name.
415
        r'(\s*:\s*(\w+)\s*)?'  # 3: Type hint.
416
        r'\s*=\s*'
417
        r'(.+?)'  # 4: Value.
418
        r'\s*'
419
        r'(#\s*(.*?)\s*)?'  # 6: Inline comment.
420
        r'$'
421
    )
422
    match = attr_pattern.match(line)
423
    result = None
424
    if match:
425
        attr_name = match.group(1)
426
        hint = match.group(3)
427
        attr_value = match.group(4)
428
        inline_comments = match.group(6)
429
        result = attr_name, hint, attr_value, inline_comments
430
    return result
431
432
433 View Code Duplication
def _create_attribute(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
434
        name: str,
435
        hint: Optional[str],
436
        assigned_value: str,
437
        docstring: Optional[str],
438
        comment: Optional[str],
439
        module: Module,
440
        line: str,
441
        line_nr: int) -> Attribute:
442
    """
443
    Create and return an Attribute instance from the given parameters.
444
    Args:
445
        name: the name of the attribute.
446
        hint: the type hint of the attribute (if any).
447
        assigned_value: the string that was literally assigned.
448
        docstring: the docstring above this attribute.
449
        comment: an inline comment (if any).
450
        module: the module that contains the attribute.
451
        line: the line that defines the attribute.
452
        line_nr: the line number of the attribute.
453
454
    Returns: an Attribute instance.
455
456
    """
457
    value = getattr(module, name)
458
    type_ = type(value)
459
    return Attribute(
460
        name=name,
461
        type_=type_,
462
        value=value,
463
        doc=docstring,
464
        comment=comment,
465
        hint=hint,
466
        module=module,
467
        assigned_value=assigned_value,
468
        line=line,
469
        line_nr=line_nr
470
    )
471
472
473
def _is_package(directory: Path) -> bool:
474
    """
475
    Return True if the given directory is a package and False otherwise.
476
    Args:
477
        directory: the directory to check.
478
479
    Returns: True if directory is a package.
480
481
    """
482
    paths = discover_paths(directory, '__init__.py')
483
    return len(paths) > 0
484
485
486
def _to_package_name(directory: Path) -> str:
487
    """
488
    Translate the given directory to a package (str). Check every parent
489
    directory in the tree to find the complete fully qualified package name.
490
    Args:
491
        directory: the directory that is to become a package name.
492
493
    Returns: a package name as string.
494
495
    """
496
    parts: List[str] = []
497
    current_dir = directory
498
    while _is_package(current_dir):
499
        # See how far up the tree we can go while still in a package.
500
        parts.insert(0, current_dir.stem)
501
        current_dir = current_dir.parent
502
    return '.'.join(parts)
503
504
505 View Code Duplication
def _find_attribute_docstring(lines: List[str]) -> Optional[str]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
506
    """
507
    Find any docstring that is right above an attribute.
508
    Args:
509
        lines: the lines of code that may contain a docstring.
510
511
    Returns: a docstring (str) or None.
512
513
    """
514
    result = None
515
    if lines:
516
        joined_lines = ''.join(lines).strip()
517
        docstring_pattern = re.compile(
518
            r'("{3}\s*([\s\S]+)\s*"{3}|'  # 2: docstring content.
519
            r'\'{3}\s*([\s\S]+)\s*\'{3})'  # 3: docstring content.
520
            r'$'
521
        )
522
        match = docstring_pattern.match(joined_lines)
523
        if match:
524
            result = (match.group(2) or match.group(3)).strip()
525
    return result
526
527
528
def _ensure_set(arg: Union[object, Iterable[object]]) -> Set[object]:
529
    # Make sure that arg is a set.
530
    result = arg or set()
531
    if not isinstance(result, Iterable):
532
        result = {result}
533
    else:
534
        result = set(result)
535
    return result
536
537
538 View Code Duplication
def _discover_list(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
539
        what_: List[type],
540
        source: Union[Path, str, Module, Iterable[Module]],
541
        **kwargs) -> List[type]:
542
    args = getattr(what_, '__args__', None) or [Any]
543
    signature = args[0]
544
    if signature in (type, Type) or isinstance(signature, TypeVar):
545
        signature = Any
546
    kwargs['signature'] = signature
547
    return discover_classes(source, **kwargs)
548