Passed
Pull Request — master (#3)
by Guibert
49s
created

async_btree.analyze   A

Complexity

Total Complexity 17

Size/Duplication

Total Lines 102
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 17
eloc 55
dl 0
loc 102
rs 10
c 0
b 0
f 0

2 Functions

Rating   Name   Duplication   Size   Complexity  
C analyze() 0 52 10
B print_analyze() 0 17 6

1 Method

Rating   Name   Duplication   Size   Complexity  
A Node.__str__() 0 2 1
1
"""Analyze definition."""
2
from inspect import getclosurevars
3
from typing import Any, List, NamedTuple, Tuple, no_type_check
4
5
from .definition import CallableFunction, NodeMetadata
6
7
8
__all__ = ["analyze", "print_analyze", "Node"]
9
10
11
class Node(NamedTuple):
12
    """Node aggregate node definition.
13
14
    - name: named operation
15
    - properties: a list of tuple (name, value) for definition.
16
    - edges: a list of tuple (name, node list) for definition.
17
    """
18
19
    name: str
20
    properties: List[Tuple[str, Any]]
21
    # edges: List[Tuple[str, List['Node']]]
22
    # https://github.com/python/mypy/issues/731
23
    edges: List[Tuple[str, List[Any]]]
24
25
    def __str__(self):
26
        return print_analyze(a_node=self)
27
28
29
# pylint: disable=protected-access
30
@no_type_check  # it's a shortcut for hasattr ...
31
def analyze(target: CallableFunction) -> Node:
32
    """Analyze specified target and return a Node representation.
33
34
    # Parameters
35
    - target (CallableFunction): async function to analyze
36
37
    # Returns
38
    (Node) a defintion
39
    """
40
41
    nonlocals = getclosurevars(target).nonlocals
42
43
    def _analyze_property(p):
44
        """Return a tuple (name, value) or (name, function name) as property."""
45
        value = nonlocals[p] if p in nonlocals else None
46
        return p, value.__name__ if value and callable(value) else value
47
48
    def _analyze_edges(egde_name):
49
        """Lookup children node from egde_name local var."""
50
        value = None
51
        if egde_name in nonlocals and nonlocals[egde_name]:
52
            edge = nonlocals[egde_name]
53
            # it could be a collection of node
54
            if hasattr(edge, "__iter__"):
55
                value = list(map(analyze, edge))
56
            else:  # or a single node
57
                value = [analyze(edge)]
58
        return (egde_name, value)
59
60
    if hasattr(target, "__node_metadata"):
61
        # its a node construct.
62
        node = target.__node_metadata
63
        if not isinstance(node, NodeMetadata):
64
            raise RuntimeError(
65
                f'attr __node_metadata of {target} is not a NodeMetadata!'
66
            )
67
        return Node(
68
            name=node.name,
69
            properties=list(map(_analyze_property, node.properties)),
70
            edges=list(
71
                filter(lambda p: p is not None, map(_analyze_edges, node.edges))
72
            ),
73
        )
74
75
    # simple function
76
    return Node(
77
        name=target.__name__.lstrip("_")
78
        if hasattr(target, "__name__")
79
        else "anonymous",
80
        properties=list(map(_analyze_property, nonlocals.keys())),
81
        edges=[],
82
    )
83
84
85
def print_analyze(a_node: Node, indent=0, label=None) -> None:
86
    """Print a textual representation of a Node."""
87
    _ident = '    '
88
    _space = f'{_ident * indent} '
89
    if label:
90
        print(f"{_space}--({label})--> {a_node.name}:")
91
        _space += f"{_ident}{' ' * len(label)}"
92
    else:
93
        print(f"{_space}--> {a_node.name}:")
94
95
    for k, v in a_node.properties:
96
        print(f"{_space}    {k}: {v}")
97
98
    for _label, children in a_node.edges:
99
        if children:
100
            for child in children:
101
                print_analyze(a_node=child, indent=indent + 1, label=_label)
102