async_btree.analyze   A
last analyzed

Complexity

Total Complexity 20

Size/Duplication

Total Lines 126
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 57
dl 0
loc 126
rs 10
c 0
b 0
f 0
wmc 20

1 Method

Rating   Name   Duplication   Size   Complexity  
A Node.__str__() 0 2 1

5 Functions

Rating   Name   Duplication   Size   Complexity  
A _get_target_propertie_name() 0 4 4
A _get_function_name() 0 2 2
B stringify_analyze() 0 28 6
A analyze() 0 37 4
A _analyze_target_edges() 0 5 3
1
"""Analyze definition."""
2
from inspect import getclosurevars
3
from typing import Any, Callable, List, NamedTuple, Optional, Tuple, no_type_check
4
5
from .definition import CallableFunction, get_node_metadata
6
7
__all__ = ["analyze", "stringify_analyze", "Node"]
8
9
_DEFAULT_EDGES = ['child', 'children', '_child', '_children']
10
11
12
class Node(NamedTuple):
13
    """Node aggregate node definition implemented with NamedTuple.
14
15
    A Node is used to keep information on name, properties, and relations ship
16
    between a hierachical construct of functions.
17
    It's like an instance of NodeMetadata.
18
19
    Attributes:
20
        name (str): named operation.
21
        properties (List[Tuple[str, Any]]): a list of tuple (name, value) for definition.
22
        edges (List[Tuple[str, List[Any]]]): a list of tuple (name, node list) for
23
            definition.
24
25
    Notes:
26
        Edges attribut should be edges: ```List[Tuple[str, List['Node']]]```
27
        But it is impossible for now, see [mypy issues 731](https://github.com/python/mypy/issues/731)
28
    """
29
30
    name: str
31
    properties: List[Tuple[str, Any]]
32
    # edges: List[Tuple[str, List['Node']]]
33
    # https://github.com/python/mypy/issues/731
34
    edges: List[Tuple[str, List[Any]]]
35
36
    def __str__(self):
37
        return stringify_analyze(target=self)
38
39
40
def _get_function_name(target: Callable) -> str:
41
    return target.__name__.lstrip("_") if hasattr(target, "__name__") else "anonymous"
42
43
44
def _get_target_propertie_name(value):
45
    if value and callable(value):
46
        return get_node_metadata(target=value).name if hasattr(value, "__node_metadata") else _get_function_name(value)
47
    return value
48
49
50
def _analyze_target_edges(edges):
51
    if edges:
52
        # it could be a collection of node or a single node
53
        return list(map(analyze, edges if hasattr(edges, "__iter__") else [edges]))
54
    return None
55
56
57
# pylint: disable=protected-access
58
@no_type_check  # it's a shortcut for hasattr ...
59
def analyze(target: CallableFunction) -> Node:
60
    """Analyze specified target and return a Node representation.
61
62
    Args:
63
        target (CallableFunction): async function to analyze.
64
65
    Returns:
66
        (Node): a node instance representation of target function
67
    """
68
69
    nonlocals = getclosurevars(target).nonlocals
70
71
    def _get_nonlocals_value_for(name):
72
        return nonlocals.get(name, None)
73
74
    def _analyze_property(p):
75
        """Return a tuple (name, value) or (name, function name) as property."""
76
        value = _get_nonlocals_value_for(name=p)
77
        return p.lstrip('_'), _get_target_propertie_name(value=value)
78
79
    def _analyze_edges(egde_name):
80
        """Lookup children node from egde_name local var."""
81
        edges = _get_nonlocals_value_for(name=egde_name)
82
        return (egde_name.lstrip('_'), _analyze_target_edges(edges=edges))
83
84
    if hasattr(target, "__node_metadata"):
85
        node = get_node_metadata(target=target)
86
        return Node(
87
            name=node.name,
88
            properties=list(map(_analyze_property, node.properties)) if node.properties else [],
89
            edges=list(filter(lambda p: p is not None, map(_analyze_edges, node.edges or _DEFAULT_EDGES))),
90
        )
91
92
    # simple function
93
    return Node(
94
        name=_get_function_name(target=target), properties=list(map(_analyze_property, nonlocals.keys())), edges=[]
95
    )
96
97
98
def stringify_analyze(target: Node, indent: int = 0, label: Optional[str] = None) -> str:
99
    """Stringify node representation of specified target.
100
101
    Args:
102
        target (CallableFunction): async function to analyze.
103
        indent (int): level identation (default to zero).
104
        label (Optional[str]): label of current node (default None).
105
106
    Returns:
107
        (str): a string node representation.
108
    """
109
    _ident = '    '
110
    _space = f'{_ident * indent} '
111
    result: str = ''
112
    if label:
113
        result += f'{_space}--({label})--> {target.name}:\n'
114
        _space += f"{_ident}{' ' * len(label)}"
115
    else:
116
        result += f'{_space}--> {target.name}:\n'
117
118
    for k, v in target.properties:
119
        result += f'{_space}    {k}: {v}\n'
120
121
    for _label, children in target.edges:
122
        if children:
123
            for child in children:
124
                result += stringify_analyze(target=child, indent=indent + 1, label=_label)
125
    return result
126