Passed
Push — master ( 19896e...28cd9e )
by Guibert
58s queued 11s
created

async_btree.analyze.print_analyze()   B

Complexity

Conditions 6

Size

Total Lines 17
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 13
dl 0
loc 17
rs 8.6666
c 0
b 0
f 0
cc 6
nop 3
1
"""Analyze definition."""
2
from inspect import getclosurevars
3
from typing import Any, List, NamedTuple, Tuple, no_type_check
4
5
from .definition import CallableFunction, NodeMetadata
6
7
8
__all__ = ["analyze", "stringify_analyze", "Node"]
9
10
11
class Node(NamedTuple):
12
    """Node aggregate node definition.
13
14
    - name: named operation
15
    - properties: a list of tuple (name, value) for definition.
16
    - edges: a list of tuple (name, node list) for definition.
17
    """
18
19
    name: str
20
    properties: List[Tuple[str, Any]]
21
    # edges: List[Tuple[str, List['Node']]]
22
    # https://github.com/python/mypy/issues/731
23
    edges: List[Tuple[str, List[Any]]]
24
25
    def __str__(self):
26
        return stringify_analyze(a_node=self)
27
28
29
# pylint: disable=protected-access
30
@no_type_check  # it's a shortcut for hasattr ...
31
def analyze(target: CallableFunction) -> Node:
32
    """Analyze specified target and return a Node representation.
33
34
    # Parameters
35
    - target (CallableFunction): async function to analyze
36
37
    # Returns
38
    (Node) a defintion
39
    """
40
41
    nonlocals = getclosurevars(target).nonlocals
42
43
    def _analyze_property(p):
44
        """Return a tuple (name, value) or (name, function name) as property."""
45
        value = nonlocals[p] if p in nonlocals else None
46
        return p, value.__name__ if value and callable(value) else value
47
48
    def _analyze_edges(egde_name):
49
        """Lookup children node from egde_name local var."""
50
        value = None
51
        if egde_name in nonlocals and nonlocals[egde_name]:
52
            edge = nonlocals[egde_name]
53
            # it could be a collection of node
54
            if hasattr(edge, "__iter__"):
55
                value = list(map(analyze, edge))
56
            else:  # or a single node
57
                value = [analyze(edge)]
58
        return (egde_name, value)
59
60
    if hasattr(target, "__node_metadata"):
61
        # its a node construct.
62
        node = target.__node_metadata
63
        if not isinstance(node, NodeMetadata):
64
            raise RuntimeError(
65
                f'attr __node_metadata of {target} is not a NodeMetadata!'
66
            )
67
        return Node(
68
            name=node.name,
69
            properties=list(map(_analyze_property, node.properties)),
70
            edges=list(
71
                filter(lambda p: p is not None, map(_analyze_edges, node.edges))
72
            ),
73
        )
74
75
    # simple function
76
    return Node(
77
        name=target.__name__.lstrip("_")
78
        if hasattr(target, "__name__")
79
        else "anonymous",
80
        properties=list(map(_analyze_property, nonlocals.keys())),
81
        edges=[],
82
    )
83
84
85
def stringify_analyze(a_node: Node, indent=0, label=None) -> str:
86
    """Print a textual representation of a Node."""
87
    _ident = '    '
88
    _space = f'{_ident * indent} '
89
    result: str = ''
90
    if label:
91
        result += f'{_space}--({label})--> {a_node.name}:\n'
92
        _space += f"{_ident}{' ' * len(label)}"
93
    else:
94
        result += f'{_space}--> {a_node.name}:\n'
95
96
    for k, v in a_node.properties:
97
        result += f'{_space}    {k}: {v}\n'
98
99
    for _label, children in a_node.edges:
100
        if children:
101
            for child in children:
102
                result += stringify_analyze(
103
                    a_node=child, indent=indent + 1, label=_label
104
                )
105
    return result
106